编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

「性能测试」Locust+boomer+prometheus的分布式性能测试方案

wxchong 2024-06-23 19:08:26 开源技术 45 ℃ 0 评论

Docker安装与网络部署

docker安装之后建立网络,避免生产环境申请端口,如果没有安全限制可以跳到最后看简要搭建。

需要多个docker容器,容器之间交互,需要建立一个docker bridge网络创建完网络之后,可以使用以下命令查看一下

sudo apt install docker.io
sudo docker network create locust_net
sudo docker network ls

locust的master节点的准备

locust的master节点只需要负责收集数据即可,性能测试数据的持久化需要上传到prometheus,myzhan大佬已经在boomer代码中开源了一个非常好用的集成了prometheus的locust的master节点代码了,所以我就拿来主义,直接用https://github.com/myzhan/boomer/blob/master/prometheus_exporter.py就好了

# coding: utf8

import six
from itertools import chain

from flask import request, Response
from locust import stats as locust_stats, runners as locust_runners
from locust import User, task, events
from prometheus_client import Metric, REGISTRY, exposition

# This locustfile adds an external web endpoint to the locust master, and makes it serve as a prometheus exporter.
# Runs it as a normal locustfile, then points prometheus to it.
# locust -f prometheus_exporter.py --master

# Lots of code taken from [mbolek's locust_exporter](https://github.com/mbolek/locust_exporter), thx mbolek!


class LocustCollector(object):
    registry = REGISTRY

    def __init__(self, environment, runner):
        self.environment = environment
        self.runner = runner

    def collect(self):
        # collect metrics only when locust runner is spawning or running.
        runner = self.runner

        if runner and runner.state in (locust_runners.STATE_SPAWNING, locust_runners.STATE_RUNNING):
            stats = []
            for s in chain(locust_stats.sort_stats(runner.stats.entries), [runner.stats.total]):
                stats.append({
                    "method": s.method,
                    "name": s.name,
                    "num_requests": s.num_requests,
                    "num_failures": s.num_failures,
                    "avg_response_time": s.avg_response_time,
                    "min_response_time": s.min_response_time or 0,
                    "max_response_time": s.max_response_time,
                    "current_rps": s.current_rps,
                    "median_response_time": s.median_response_time,
                    "ninetieth_response_time": s.get_response_time_percentile(0.9),
                    # only total stats can use current_response_time, so sad.
                    #"current_response_time_percentile_95": s.get_current_response_time_percentile(0.95),
                    "avg_content_length": s.avg_content_length,
                    "current_fail_per_sec": s.current_fail_per_sec
                })

            # perhaps StatsError.parse_error in e.to_dict only works in python slave, take notices!
            errors = [e.to_dict() for e in six.itervalues(runner.stats.errors)]

            metric = Metric('locust_user_count', 'Swarmed users', 'gauge')
            metric.add_sample('locust_user_count', value=runner.user_count, labels={})
            yield metric
            
            metric = Metric('locust_errors', 'Locust requests errors', 'gauge')
            for err in errors:
                metric.add_sample('locust_errors', value=err['occurrences'],
                                  labels={'path': err['name'], 'method': err['method'],
                                          'error': err['error']})
            yield metric

            is_distributed = isinstance(runner, locust_runners.MasterRunner)
            if is_distributed:
                metric = Metric('locust_slave_count', 'Locust number of slaves', 'gauge')
                metric.add_sample('locust_slave_count', value=len(runner.clients.values()), labels={})
                yield metric

            metric = Metric('locust_fail_ratio', 'Locust failure ratio', 'gauge')
            metric.add_sample('locust_fail_ratio', value=runner.stats.total.fail_ratio, labels={})
            yield metric

            metric = Metric('locust_state', 'State of the locust swarm', 'gauge')
            metric.add_sample('locust_state', value=1, labels={'state': runner.state})
            yield metric

            stats_metrics = ['avg_content_length', 'avg_response_time', 'current_rps', 'current_fail_per_sec',
                             'max_response_time', 'ninetieth_response_time', 'median_response_time', 'min_response_time',
                             'num_failures', 'num_requests']

            for mtr in stats_metrics:
                mtype = 'gauge'
                if mtr in ['num_requests', 'num_failures']:
                    mtype = 'counter'
                metric = Metric('locust_stats_' + mtr, 'Locust stats ' + mtr, mtype)
                for stat in stats:
                    # Aggregated stat's method label is None, so name it as Aggregated
                    # locust has changed name Total to Aggregated since 0.12.1
                    if 'Aggregated' != stat['name']:
                        metric.add_sample('locust_stats_' + mtr, value=stat[mtr],
                                          labels={'path': stat['name'], 'method': stat['method']})
                    else:
                        metric.add_sample('locust_stats_' + mtr, value=stat[mtr],
                                          labels={'path': stat['name'], 'method': 'Aggregated'})
                yield metric


@events.init.add_listener
def locust_init(environment, runner, **kwargs):
    print("locust init event received")
    if environment.web_ui and runner:
        @environment.web_ui.app.route("/export/prometheus")
        def prometheus_exporter():
            registry = REGISTRY
            encoder, content_type = exposition.choose_encoder(request.headers.get('Accept'))
            if 'name[]' in request.args:
                registry = REGISTRY.restricted_registry(request.args.get('name[]'))
            body = encoder(registry)
            return Response(body, content_type=content_type)
        REGISTRY.register(LocustCollector(environment, runner))


class Dummy(User):
    @task(20)
    def hello(self):
        pass
  • 第一步: 将代码放进服务器,然后拉取Locust镜像
  • 第二步: 启动locust的master节点容器
sudo docker run -p 8089:8089 -p 5557:5557 -v $PWD/prometheus_exporter.py:/mnt/locust/locustfile.py --name=locust_master --network=locust_net --network-alias=locust_master locustio/locust -f /mnt/locust/locustfile.py --master
  • 可能出现的错误: MoudleNotFoundError: prometheus_client

解决思路: 在上面的py文件加入如下代码

import os
os.system("tail -f /dev/null")
  • 确认master节点执行没问题
 sudo docker restart locust_master
sudo docker ps 
sudo docker exec -it locust_master /bin/bash
pip install prometheus_client

再次使用vim编辑prometheus_exporter.py文件
删除加的两行,保存文件

sudo docker restart locust_master

master节点启动完成,在浏览器输入http://ip:8089可以访问到locust的web页面
在使用http://p:8089/export/prometheus
如果出现如下图所示的prometheus数据,表示启动正确


启动locust的worker节点容器

worker节点是压测的业务代码

package main
 
import (
    "bytes"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "strings"
    "time"
 
    "github.com/myzhan/boomer"
)
 
type Status struct {
    Code int    `json:"code"`
    Text string `json:"text"`
}
 
// StatusResp  resp
type StatusResp struct {
    Status Status `json:"status"`
}
 
// CDMResp code data message 三段式
type CDMResp struct {
    Code string `json:"code"`
}
 
// token 登录token
var token string
 
// login 登录
func login() (err error) {
    form := url.Values{}
    form.Add("client_id", "12345")
    form.Add("client_secret", "xxxxxxxxxxxxxx")
    form.Add("grant_type", "password")
    form.Add("username", "1234:1234")
    form.Add("password", "xxxxxxxxxx")
    form.Add("platform", "openplatform_mobile")
    req, err := http.NewRequest(http.MethodPost, "https://xxxx", strings.NewReader(form.Encode()))
 
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    if err != nil {
        panic(err)
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }
    respBody := make(map[string]interface{})
    err = json.Unmarshal(body, &respBody)
    if err != nil {
        return err
    }
    custom := respBody["custom"]
    if custom == nil {
        return fmt.Errorf("nil custom")
    }
    if cu, ok := custom.(map[string]interface{}); ok {
        token = cu["access_token"].(string)
    } else {
        return fmt.Errorf("failed to assert custom")
    }
    return
}
 
// ParseCMDResp CMD格式的错误判断
func ParseCMDResp(i []byte) error {
    rp := &CDMResp{}
    err := json.Unmarshal(i, rp)
    if err != nil {
        return err
    }
    if rp.Code != "000001" {
        return fmt.Errorf("非正常Code: %s", rp.Code)
    }
    return nil
}
 
// ParseStatusResp status格式返回值的错误判断
func ParseStatusResp(i []byte) error {
    rp := &StatusResp{}
    err := json.Unmarshal(i, rp)
    if err != nil {
        return err
    }
    if rp.Status.Code != 200 {
        return fmt.Errorf("非正常Code: %d", rp.Status.Code)
    }
    return nil
}
 
// closeRespAndlogResult 上报错误,如果err不为空,以err为准
func closeRespAndlogResult(name string, start time.Time, resp *http.Response, err error, parseBody ...func([]byte) error) {
    var finalErr error
    var clen int64
    if resp != nil {
        defer resp.Body.Close()
        clen = resp.ContentLength
    }
    if err != nil {
        finalErr = err
    } else {
        if resp != nil && resp.Status != "200 OK" {
            finalErr = errors.New(resp.Status)
        }
    }
    var b []byte
    // 尝试解析body看看有没有报错
    if len(parseBody) != 0 && finalErr == nil {
        b, err = ioutil.ReadAll(resp.Body)
        if err != nil {
            finalErr = fmt.Errorf("ioutil.ReadAll err: %s", err)
        } else {
            for _, fn := range parseBody {
                perr := fn(b)
                if finalErr == nil {
                    finalErr = perr
                } else {
                    finalErr = fmt.Errorf(finalErr.Error()+", err:%s", perr)
                }
            }
        }
    }
    if printResp {
        log.Println(name, "resp", string(b))
    }
    elapsed := time.Since(start)
    if finalErr != nil {
        log.Println(name, "error", finalErr)
        boomer.RecordFailure("http", name, elapsed.Nanoseconds()/int64(time.Millisecond), finalErr.Error()+string(b))
    } else {
        log.Println(name, "succeed")
        boomer.RecordSuccess("http", name, elapsed.Nanoseconds()/int64(time.Millisecond), clen)
    }
}
 
// generate 
func generate() {
    start := time.Now()
    var resp *http.Response
    var err error
    defer func() {
        closeRespAndlogResult("generate", start, resp, err, ParseCMDResp)
    }()
 
    reqBody := map[string]string{
        "location": "3124",
    }
    reqByte, err := json.Marshal(reqBody)
    if err != nil {
        return
    }
    req, err := http.NewRequest(http.MethodPost, "https://xxxx", bytes.NewBuffer(reqByte))
    if err != nil {
        return
    }
    q := req.URL.Query()
    q.Add("token", token)
    q.Add("userType", "1")
    req.URL.RawQuery = q.Encode()
    req.Header.Set("x-authenticated-clientid", "12345")
    req.Header.Set("Content-Type", "application/json")
 
    resp, err = http.DefaultClient.Do(req)
    return
}
 
// get
func getHeSuan() {
    start := time.Now()
    var err error
    var resp *http.Response
    defer func() {
        closeRespAndlogResult("getHeSuan", start, resp, err, ParseStatusResp)
    }()
    req, err := http.NewRequest(http.MethodPost, "http://xxxx", nil)
    if err != nil {
        return
    }
    req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
    resp, err = http.DefaultClient.Do(req)
    return
}

}

 
var apiName string
var printResp bool
 
var taskByName = map[string]*boomer.Task{
    "generate": {
        Name:   "task",
        Weight: 10,
        Fn:     generate,
    },
    "getHeSuan": {
        Name:   "task",
        Weight: 10,
        Fn:     getHeSuan,
    },
}
 
func init() {
    flag.StringVar(&apiName, "api-name", "", "api名称, 可能的值:used,,save,generate,getHeSuan")
    flag.BoolVar(&printResp, "print-resp", false, "是否打印响应内容")
}
 
func main() {
    flag.Parse()
    var t *boomer.Task
    t, ok := taskByName[apiName]
    if !ok {
        panic(fmt.Sprintf("非法的api name: %s", apiName))
    }
    // 登录
    for i := 0; i < 10; i++ {
        err := login()
        if err != nil {
            log.Println("login failed", err)
            time.Sleep(2 * time.Second)
            continue
        }
        break
    }
 
    log.SetFlags(log.LstdFlags | log.Lshortfile)
 
    boomer.Run(t)
}

启动:

sudo docker run -d -v $PWD/locust_worker.py:/mnt/locust/locustfile.py --name=locust_worker --network=locust_net locustio/locust -f /mnt/locust/locustfile.py --worker --master-host locust_master --master-port 5557
// 使用该命令查看容器的运行状态
sudo docker ps 

此时,master和worker节点都已经起来了,进入web管理页面,看到worker数变为了1


启动prometheus

  • 创建prometheus.yml配置文件
global:
  scrape_interval:     10s
  evaluation_interval: 10s
 
scrape_configs:
  - job_name: prometheus
    static_configs:
      - targets: ['localhost:9090']
        labels:
          instance: prometheus
          
  - job_name: locust
    
    metrics_path: '/export/prometheus'
    static_configs:
      - targets: ['locust_master:8089']  # 这里是locust的master节点启动命令中的network-alias后面的参数 + 内部端口,不要写外部映射的端口号
        labels:
          instance: locust

下载并创建容器,验证

sudo docker run -d -p 9090:9090 -v $PWD/prometheus.yml:/etc/prometheus/prometheus.yml --name=prometheus --network=locust_net --network-alias=prometheus prom/prometheus
sudo docker ps 

浏览器输入http://ip:9090/targets

启动grafana

sudo docker run -d -p 3000:3000 --name grafana --network=locust_net grafana/grafana
  • 登录grafana并进入Configuration,注意浏览器的url

首次登录用户名/密码admin,进去了之后需要改密码,登录后如图进入Configuration

  • 添加数据源,点击Add data source 选择prometheus
  • 配置数据源url

url处输入建立prometheus容器时的–network-alias的别名:9090,端口输入内部端口,不要输入映射的端口,生成环境启动prometheus是没有-p的

  • 导入模板

保存成功后,导入模板,输入id12081点击load,选择一下prometheus然后import

  • 完成启动:


简单部署

不做网络隔离,确保master-slave在同一个网段后进行如下部署。

主从节点的代码一样、prometheus的配置一样。

 master 节点
1.安装 python 3.6 以上版本
2.pip3 install pyzmq==16.0.2
3.pip3 install locust==1.6.0
4.mkdir -p /opt/locust

slave 节点
安装 go
wget https://golang.google.cn/dl/go1.17.5.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.17.5.linux-amd64.tar.gz
echo "export PATH=$PATH:/usr/local/go/bin" >> /etc/profile
source /etc/profile

安装 boomer
mkdir -p /opt/boomer
cd /opt/boomer
go mod init task
go env -w GOPROXY=https://goproxy.cn,direct
go get github.com/myzhan/boomer
go get -u github.com/myzhan/boomer

简单执行方式(非docker镜像模式 ):

Master节点没什么好说的,就是一个简单的py脚本
Slave节点需要如下验证:
可通过 go run task.go --run-tasks simpleGet,complexGet 对脚本功能进行单独验证。
可正确执行即可进行打包:go build -o task task.go (可以一次打包后将可执行任务分发到各 slave 上)

测试执行

master 节点
启动: cd /opt/lucust && locust --master -f master.py --web-port 8089
--web-port 指定 web 页面的端口,默认8089,浏览器访问 服务ip:web-port 即可打开页面

slave 节点
启动:cd /opt/boomer && ./task --master-host 【your ip】
--master-host 指定 master 节点的 ip,默认127.0.0.1
--master-port 指定 master 节点分发任务的端口,默认5557


prometheus 配置修改,增加 locust exporter 配置
vi /opt/deploy/prometheus/prometheus.yml
……
- job_name: locust-exporter
static_configs:
- targets: ['ip:8089']     # locust master web 地址

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表