编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

# Kafka_深入探秘者(8):kafka 高级应用--1

wxchong 2024-09-27 05:42:38 开源技术 15 ℃ 0 评论

#寻找热爱表达的你#

# Kafka_深入探秘者(8):kafka 高级应用--1

段子手168

## 一、kafka 消费组管理

### 1、kafka 命令行工具

参考官网: http://kafka.apache.org/22/documentation.html

### 2、kafka 消费组管理:查看消费组

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 或者

cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):

bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --list

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --list

# 查看消费组命令

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

```

### 3、kafka 消费组管理:查看消费组详情

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 或者

cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):

bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo

# 查看消费组详情命令(group.demo 是你自己创建的需要查看的组名)

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo

```

### 4、kafka 消费组管理:查看消费组当前状态

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 或者

cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):

bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --state

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --state

# 查看消费组当前状态

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --state

```



### 5、kafka 消费组管理:查看消费组内成员信息

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 或者

cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):

bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --members

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --members

# 查看消费组内成员信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --members

```

### 6、kafka 消费组管理:删除消费组,如果有消费者在使用,则会删除失败。

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 或者

cd /usr/local/kafka/kafka_2.12-2.2.1/

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):

bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110 --delete --group group.demo

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --delete --group group.demo

# 删除消费组,如果有消费者在使用,则会删除失败

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group group.demo

```

### 7、kafka 消费位移管理:重置消费位移,如果有消费者在使用,则会重置失败。

1)kafka 重置消费位移命令:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

2)命令参数说明:

--all-topics :指定了所有主题。

--reset-offsets :重置消费。

--to-earliest :移到最后

--execute :开始执行命令

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 或者

cd /usr/local/kafka/kafka_2.12-2.2.1/

# 重置消费位移,如果有消费者在使用,则会重置失败

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):

bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

```



## 二、kafka 数据管道 Connect 文件系统

### 1、kafka 数据管道 Connect 文件系统 概述




1)Kafka 是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用 Producer 来发送消息到 Broker,并使用 Consumer 来消费 Broker 中的消息。Kafka Connect 是到 0.9 版本才提供的并极大的简化了其他系统与 Kafka 的集成。

2)Kafka Connect 运用用户快速定义并实现各种 Connector(File,dbc,Hdfs等),这些功能让大批量数据导入/导出 Kafka 很方便。

3)在 Kafka Connect 中还有两个重要的概念: Task 和 Worker。

4)Connect 中一些概念:

- 连接器: 实现了 ConnectAPI,决定需要运行多少个任务,按照任务来进行数据复制,从 work 进程获取任务配置并将其传递下去。

- 任务: 负责将数据移入或移出 Kafka。

- work 进程: 相当与 connector 和任务的容器,用于负责管理连接器的配置、启动连接器和连接器任务,提供 RESTAPI。

- 转换器: kafka connect 和其他存储系统直接发送或者接受数据之间转换数据。

### 2、kafka 独立模式-文件系统

场景

以下示例使用到了两个 Connector,将文件 source.txt 中的内容通过 Source 连接器写入 Kafka 主题中,然后将内容写入 srouce.sink.txt 中。

- FileStreamSource: 从 source.txt 中读取并发布到 Broker 中。

- FileStreamSink: 从 Broker 中读取数据并写入到 source.sink.txt 文件中。

### 3、步骤详情

#### 3.1、首先我们来看下 Worker 进程用到的配置文件

${(KAFKA_HOME}/config/connect-standalone.properties

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

// Kafka 集群连接的地址(把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128))

# bootstrap.servers=localhost:9092

bootstrap.servers=172.18.30.110:9092

# 打开并编辑 config/connect-standalone.properties 文件

vim config/connect-standalone.properties

# 修改以下几项:

# 启动端口,注意不能冲突

rest.port=8084

// 格式转化类

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

// json消息中是否包含schema

key.converter.schemas.enable=true

value.converter.schemas.enable.true

// 保存偏移量的文件路径

offset.storage.file.filename=/tmp/connect.offsets

// 设定提交偏移量的频率

offset.flush.interval.ms=10808

```

#### 3.2 其中的 Source 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-source.properties

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 打开并编辑 config/connect-file-source.properties 文件

vim config/connect-file-source.properties

# 修改以下几项:

//配置连接器的名称

name.local-file-source

//连接器的全限定名称,设置类名称也是可以的

connector.class=FileStreamSource

// task数量

tasks .max=1

//数据源的文件路径(修改为你自己创建source.txt文件实际路径)

file=/tmp/source.txt

// 主题名称(修改为你自己创建的主题,如:heima)

# topic=topic0703

topic=heima

```

#### 3.3 其中的 Sink 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-sink.properties

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

# 打开并编辑 config/connect-file-sink.properties 文件

vim config/connect-file-sink.properties

# 修改以下几项:

name=local-file-sink

connectér.class=FileStreamsink

tasks.max=1

# 修改为你自己创建sink.txt文件实际路径

#file=/tmp/source.sink.txt

file=/tmp/sink.txt

# 修改为你自己创建的主题,如:heima

#topics=topice703

topics=heima

```

#### 3.4 启动 source 连接器

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

```

#### 3.5 启动 slink 连接器

```java

# 切换到 kafka 安装目录

cd /usr/local/kafka/kafka_2.12-2.8.0/

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties

```

#### 3.6 source 写入文本信息

```java

# 写入消息到 source.txt

echo "hello kafka,coming">> /tmp/source.txt

# 查询是否写入成功

less /tmp/source.txt

# 同时查询sink.txt 是否有内容传输过来

cat /tmp/sink.txt

```

## 三、kafka springboot+kafka 001

### 1、打开 idea 创建 artifactId 名为 kafka_spring_learn 的 maven 工程。

```java

--> idea --> File

--> New --> Project

--> Maven

Project SDK: ( 1.8(java version "1.8.0_131" )

--> Next

--> Groupld : ( djh.it )

Artifactld : ( kafka_spring_learn )

Version : 1.0-SNAPSHOT

--> Name: ( kafka_spring_learn )

Location: ( ...\kafka_spring_learn\ )

--> Finish

```

### 2、在 kafka_spring_learn 工程的 pom.xml 文件中导入依赖坐标。

```java

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>djh.it</groupId>

<artifactId>kafka_spring_learn</artifactId>

<version>1.0-SNAPSHOT</version>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.1.8.RELEASE</version>

<!-- <relativePath></relativePath>-->

</parent>

<properties>

<java.version>8</java.version>

<!-- <scala.version>2.11</scala.version>-->

<scala.version>2.12</scala.version>

<slf4j.version>1.7.21</slf4j.version>

<!-- <kafka.version>2.0.0</kafka.version>-->

<kafka.version>2.8.0</kafka.version>

<lombok.version>1.18.8</lombok.version>

<junit.version>4.11</junit.version>

<gson.version>2.2.4</gson.version>

<protobuff.version>1.5.4</protobuff.version>

<!-- <spark.version>2.3.1</spark.version>-->

<spark.version>2.4.8</spark.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<version>2.2.6.RELEASE</version>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>${kafka.version}</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_${scala.version}</artifactId>

<version>${kafka.version}</version>

<exclusions>

<exclusion>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

</exclusion>

<exclusion>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

</exclusion>

<exclusion>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

<version>${slf4j.version}</version>

</dependency>

<dependency>

<groupId>com.google.code.gson</groupId>

<artifactId>gson</artifactId>

<version>${gson.version}</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>${junit.version}</version>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<version>${lombok.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>io.protostuff</groupId>

<artifactId>protostuff-core</artifactId>

<version>${protobuff.version}</version>

</dependency>

<dependency>

<groupId>io.protostuff</groupId>

<artifactId>protostuff-runtime</artifactId>

<version>${protobuff.version}</version>

</dependency>

<dependency>

<groupId>com.fasterxml.jackson.core</groupId>

<artifactId>jackson-core</artifactId>

<version>2.9.4</version>

</dependency>

<dependency>

<groupId>com.fasterxml.jackson.module</groupId>

<artifactId>jackson-module-scala_2.11</artifactId>

<version>2.9.5</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_${scala.version}</artifactId>

<version>${spark.version}</version>

<exclusions>

<exclusion>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

</exclusion>

<exclusion>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

</exclusion>

<exclusion>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

</project>

```

### 3、在 kafka_spring_learn 工程,创建 application.properties 配置文件。

```java

# kafka_spring_learn\src\main\resources\application.properties

logging.level.root=INFO

# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

# IP 地址换成你自己的虚拟机 IP 地址

#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092

#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092

spring.kafka.producer.bootstrap-servers=172.18.30.110:9092

spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092

```

### 4、在 kafka_spring_learn 工程,创建 KafkaLearnApplication.java 启动类文件。

```java

/**

* kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java

*

* 2024-6-24 创建 KafkaLearnApplication.java 启动类文件

*/

package djh.it.kafkalearn;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

@SpringBootApplication

@RequestMapping

//避免Gson版本冲突快捷配置

@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})

public class KafkaLearnApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);

public static void main(String[] args) {

SpringApplication.run(KafkaLearnApplication.class, args);

}

@RequestMapping("/index")

public String index(){

return "hello,kafka spring comming";

}

@Autowired

private KafkaTemplate template;

private static final String topic = "heima";

// 消息生产者

@GetMapping("/send/{input}")

public String sendToKafka(@PathVariable String input){

this.template.send(topic, input);

return "Send success! " + input;

}

//消息的接收

@KafkaListener(id ="", topics = topic, groupId = "group.demo")

public void listener(String input){

LOGGER.info("message input value:{}", input);

}

}

```

### 5、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/index

输出: hello,kafka spring comming

2)浏览器地址栏输入: localhost:8080/send/kafka

输出: Send success! kafka




**`上一节关联链接请点击`**

# Kafka_深入探秘者(7):kafka 稳定性_kafka稳定性-CSDN博客

[# Kafka_深入探秘者(7):kafka 稳定性](https://dzs168.blog.csdn.net/article/details/139922855)

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表