网站首页 > 开源技术 正文
# Kafka_深入探秘者(8):kafka 高级应用--1
段子手168
## 一、kafka 消费组管理
### 1、kafka 命令行工具
参考官网: http://kafka.apache.org/22/documentation.html
### 2、kafka 消费组管理:查看消费组
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者
cd /usr/local/kafka/kafka_2.12-2.2.1/
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --list
# 查看消费组命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
```
### 3、kafka 消费组管理:查看消费组详情
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者
cd /usr/local/kafka/kafka_2.12-2.2.1/
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo
# 查看消费组详情命令(group.demo 是你自己创建的需要查看的组名)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo
```
### 4、kafka 消费组管理:查看消费组当前状态
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者
cd /usr/local/kafka/kafka_2.12-2.2.1/
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --state
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --state
# 查看消费组当前状态
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --state
```
### 5、kafka 消费组管理:查看消费组内成员信息
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者
cd /usr/local/kafka/kafka_2.12-2.2.1/
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --members
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --members
# 查看消费组内成员信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --members
```
### 6、kafka 消费组管理:删除消费组,如果有消费者在使用,则会删除失败。
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者
cd /usr/local/kafka/kafka_2.12-2.2.1/
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110 --delete --group group.demo
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --delete --group group.demo
# 删除消费组,如果有消费者在使用,则会删除失败
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group group.demo
```
### 7、kafka 消费位移管理:重置消费位移,如果有消费者在使用,则会重置失败。
1)kafka 重置消费位移命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute
2)命令参数说明:
--all-topics :指定了所有主题。
--reset-offsets :重置消费。
--to-earliest :移到最后
--execute :开始执行命令
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者
cd /usr/local/kafka/kafka_2.12-2.2.1/
# 重置消费位移,如果有消费者在使用,则会重置失败
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute
```
## 二、kafka 数据管道 Connect 文件系统
### 1、kafka 数据管道 Connect 文件系统 概述
1)Kafka 是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用 Producer 来发送消息到 Broker,并使用 Consumer 来消费 Broker 中的消息。Kafka Connect 是到 0.9 版本才提供的并极大的简化了其他系统与 Kafka 的集成。
2)Kafka Connect 运用用户快速定义并实现各种 Connector(File,dbc,Hdfs等),这些功能让大批量数据导入/导出 Kafka 很方便。
3)在 Kafka Connect 中还有两个重要的概念: Task 和 Worker。
4)Connect 中一些概念:
- 连接器: 实现了 ConnectAPI,决定需要运行多少个任务,按照任务来进行数据复制,从 work 进程获取任务配置并将其传递下去。
- 任务: 负责将数据移入或移出 Kafka。
- work 进程: 相当与 connector 和任务的容器,用于负责管理连接器的配置、启动连接器和连接器任务,提供 RESTAPI。
- 转换器: kafka connect 和其他存储系统直接发送或者接受数据之间转换数据。
### 2、kafka 独立模式-文件系统
场景
以下示例使用到了两个 Connector,将文件 source.txt 中的内容通过 Source 连接器写入 Kafka 主题中,然后将内容写入 srouce.sink.txt 中。
- FileStreamSource: 从 source.txt 中读取并发布到 Broker 中。
- FileStreamSink: 从 Broker 中读取数据并写入到 source.sink.txt 文件中。
### 3、步骤详情
#### 3.1、首先我们来看下 Worker 进程用到的配置文件
${(KAFKA_HOME}/config/connect-standalone.properties
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
// Kafka 集群连接的地址(把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128))
# bootstrap.servers=localhost:9092
bootstrap.servers=172.18.30.110:9092
# 打开并编辑 config/connect-standalone.properties 文件
vim config/connect-standalone.properties
# 修改以下几项:
# 启动端口,注意不能冲突
rest.port=8084
// 格式转化类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
// json消息中是否包含schema
key.converter.schemas.enable=true
value.converter.schemas.enable.true
// 保存偏移量的文件路径
offset.storage.file.filename=/tmp/connect.offsets
// 设定提交偏移量的频率
offset.flush.interval.ms=10808
```
#### 3.2 其中的 Source 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-source.properties
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 打开并编辑 config/connect-file-source.properties 文件
vim config/connect-file-source.properties
# 修改以下几项:
//配置连接器的名称
name.local-file-source
//连接器的全限定名称,设置类名称也是可以的
connector.class=FileStreamSource
// task数量
tasks .max=1
//数据源的文件路径(修改为你自己创建source.txt文件实际路径)
file=/tmp/source.txt
// 主题名称(修改为你自己创建的主题,如:heima)
# topic=topic0703
topic=heima
```
#### 3.3 其中的 Sink 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-sink.properties
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 打开并编辑 config/connect-file-sink.properties 文件
vim config/connect-file-sink.properties
# 修改以下几项:
name=local-file-sink
connectér.class=FileStreamsink
tasks.max=1
# 修改为你自己创建sink.txt文件实际路径
#file=/tmp/source.sink.txt
file=/tmp/sink.txt
# 修改为你自己创建的主题,如:heima
#topics=topice703
topics=heima
```
#### 3.4 启动 source 连接器
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
```
#### 3.5 启动 slink 连接器
```java
# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
```
#### 3.6 source 写入文本信息
```java
# 写入消息到 source.txt
echo "hello kafka,coming">> /tmp/source.txt
# 查询是否写入成功
less /tmp/source.txt
# 同时查询sink.txt 是否有内容传输过来
cat /tmp/sink.txt
```
## 三、kafka springboot+kafka 001
### 1、打开 idea 创建 artifactId 名为 kafka_spring_learn 的 maven 工程。
```java
--> idea --> File
--> New --> Project
--> Maven
Project SDK: ( 1.8(java version "1.8.0_131" )
--> Next
--> Groupld : ( djh.it )
Artifactld : ( kafka_spring_learn )
Version : 1.0-SNAPSHOT
--> Name: ( kafka_spring_learn )
Location: ( ...\kafka_spring_learn\ )
--> Finish
```
### 2、在 kafka_spring_learn 工程的 pom.xml 文件中导入依赖坐标。
```java
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>djh.it</groupId>
<artifactId>kafka_spring_learn</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<!-- <relativePath></relativePath>-->
</parent>
<properties>
<java.version>8</java.version>
<!-- <scala.version>2.11</scala.version>-->
<scala.version>2.12</scala.version>
<slf4j.version>1.7.21</slf4j.version>
<!-- <kafka.version>2.0.0</kafka.version>-->
<kafka.version>2.8.0</kafka.version>
<lombok.version>1.18.8</lombok.version>
<junit.version>4.11</junit.version>
<gson.version>2.2.4</gson.version>
<protobuff.version>1.5.4</protobuff.version>
<!-- <spark.version>2.3.1</spark.version>-->
<spark.version>2.4.8</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>${protobuff.version}</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>${protobuff.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
```
### 3、在 kafka_spring_learn 工程,创建 application.properties 配置文件。
```java
# kafka_spring_learn\src\main\resources\application.properties
logging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092
spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092
```
### 4、在 kafka_spring_learn 工程,创建 KafkaLearnApplication.java 启动类文件。
```java
/**
* kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java
*
* 2024-6-24 创建 KafkaLearnApplication.java 启动类文件
*/
package djh.it.kafkalearn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
public class KafkaLearnApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);
public static void main(String[] args) {
SpringApplication.run(KafkaLearnApplication.class, args);
}
@RequestMapping("/index")
public String index(){
return "hello,kafka spring comming";
}
@Autowired
private KafkaTemplate template;
private static final String topic = "heima";
// 消息生产者
@GetMapping("/send/{input}")
public String sendToKafka(@PathVariable String input){
this.template.send(topic, input);
return "Send success! " + input;
}
//消息的接收
@KafkaListener(id ="", topics = topic, groupId = "group.demo")
public void listener(String input){
LOGGER.info("message input value:{}", input);
}
}
```
### 5、启动 KafkaLearnApplication.java 启动类,进行测试。
1)浏览器地址栏输入: localhost:8080/index
输出: hello,kafka spring comming
2)浏览器地址栏输入: localhost:8080/send/kafka
输出: Send success! kafka
**`上一节关联链接请点击`**
# Kafka_深入探秘者(7):kafka 稳定性_kafka稳定性-CSDN博客
[# Kafka_深入探秘者(7):kafka 稳定性](https://dzs168.blog.csdn.net/article/details/139922855)
猜你喜欢
- 2024-09-30 设计模式学习笔记:原型模式以及深浅拷贝的区别
- 2024-09-30 金三银四涨薪计划Java不懂这些核心技能,还想去大厂?
- 2024-09-30 我赌你不懂系列:序列化是什么(序列化的概念)
- 2024-09-30 使用MVP+RxJava+Retrofit框架(mvp retrofit rxjava)
- 2024-09-30 Java开发必须要掌握的20个核心技术,你掌握了多少?
- 2024-09-30 Android 上的序列化和反序列化(序列化和反序列化使用场景)
- 2024-09-30 114、mvc各部分;反射一般场景(反射setaccessible)
- 2024-09-27 # Kafka_深入探秘者(8):kafka 高级应用--2
- 2024-09-27 为什么强烈禁止开发人员使用isSuccess作为变量名
- 2024-09-27 重新认识一个强大的 Gson,从一个线上 BUG 说起
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)