编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

Wazuh 和 Shuffle 集成(collections.shuffle 作用)

wxchong 2024-07-20 08:42:33 开源技术 6 ℃ 0 评论

大家好,在前面的章节中,我们已经介绍了 Wazuh XDR(扩展检测响应)工具和 Shuffle SOAR(安全协调、自动化和响应)工具,今天我们将介绍它们之间的集成!

Wazuh

Wazuh 是一个开源安全平台,集入侵检测、日志管理和漏洞评估于一身。它包括一个中央管理器、用于主机监控的轻量级代理和一个全面的规则集。通过实时监控和分析,Wazuh 可帮助企业有效地检测和应对安全威胁。它提供用户友好型界面,支持与其他工具集成,并能增强整体安全态势。

Shuffle

Shuffle SOAR 是一个功能强大的开源平台,可实现安全操作的集中化和自动化。它集成了各种安全工具,协调事件响应工作流,并通过预定义的流程自动执行常规任务。凭借先进的情报和报告功能,它使网络安全专业人员能够有效、高效地应对安全事件,最终加强组织的安全态势。

首先,我们打开名为 Wazuh 集成测试的 Shuffle 工作流,复制网络钩子网址

https://<ip-adress>:<port>/api/v1/hooks/webhook_18cdc939-5e7b-428e-b82f-e1481ffc8fe6

首先使用下面的命令在 docker 中获取一个执行终端

docker ps

复制 Wazuh docker-image 的哈希值

docker exec -it <hash-wazuh-docker-image> /bin/bash

转到 /var/ossec/etc/ossec.conf 中的 ossec.conf 文件,然后粘贴此集成 XML 代码

<integration>
    <name> shuffle-integration </name>
  <hook_url>https://<ip-adress>:<port>/api/v1/hooks/webhook_18cdc939-5e7b-428e-b82f-e1481ffc8fe6</hook_url>
    <level>3</level> <!-- you can use <rule_id> </rule_id> instead of <level>-->
        <alert_format>json</alert_format>
</integration>

name:是和集成的名称

hook_url:用于请求接收的 webhook 网址

级别:仅捕获 3 级及以上

alert_format:以 json 格式响应 rest API

现在,我们需要将与 XML 名称完全相同的脚本集成放到 /var/ossec/integrations 中,并创建文件 shuffle-integration.py

#!/usr/bin/env python3
# Created by Shuffle, AS. <frikky@shuffler.io>.
# Based on the Slack integration using Webhooks

import json
import sys
import time
import os

try:
    import requests
        from requests.auth import HTTPBasicAuth
        except Exception as e:
    print("No module 'requests' found. Install: pip install requests")
    sys.exit(1)

# Global vars
debug_enabled = False 
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)

try:
    with open("/tmp/shuffle_start.txt", "w+") as tmp:
        tmp.write("Script started")
except:
    pass
    
    
    def main(args):
        debug("# Starting")

    # Read args
        alert_file_location = args[1]
    webhook = args[3]

    debug("# Webhook")
    debug(webhook)

    debug("# File location")
    debug(alert_file_location)

    # Load alert. Parse JSON object.
        try:
                with open(alert_file_location) as alert_file:
            json_alert = json.load(alert_file)
    except:
        debug("# Alert file %s doesn't exist" % alert_file_location)

    debug("# Processing alert")
    try:
            debug(json_alert)
    except Exception as e:
        debug("Failed getting json_alert %s" % e)
        sys.exit(1)

    debug("# Generating message")
    msg = generate_msg(json_alert)
    if isinstance(msg, str):
            if len(msg) == 0:
                        return
    debug(msg)

    debug("# Sending message")

    try:
            with open("/tmp/shuffle_end.txt", "w+") as tmp:
            tmp.write("Script done pre-msg sending")
    except:
        pass
        
        
            send_msg(msg, webhook)def debug(msg):
                if debug_enabled:
                          msg = "{0}: {1}\n".format(now, msg)
        print(msg)
        f = open(log_file, "a")
        f.write(msg)
        f.close()

# Skips container kills to stop self-recursion
def filter_msg(alert):
    # These are things that recursively happen because Shuffle starts Docker containers
        skip = ["87924", "87900", "87901", "87902", "87903", "87904", "86001", "86002", "86003", "87932", "80710", "87929", "87928", "5710"]
    if alert["rule"]["id"] in skip:
            return False

    #try:
    #    if "docker" in alert["rule"]["description"].lower() and "
    #msg['text'] = alert.get('full_log')
    #except:
    #    pass
        #msg['title'] = alert['rule']['description'] if 'description' in alert['rule'] else "N/A"

    return True

def generate_msg(alert):
    if not filter_msg(alert):
            print("Skipping rule %s" % alert["rule"]["id"])
        return ""

    level = alert['rule']['level']

    if (level <= 4):
            severity = 1
    elif (level >= 5 and level <= 7):
            severity = 2
    else:
            severity = 3

    msg = {}
    msg['severity'] = severity 
    msg['pretext'] = "WAZUH Alert"
    msg['title'] = alert['rule']['description'] if 'description' in alert['rule'] else "N/A"
    msg['text'] = alert.get('full_log')
    msg['rule_id'] = alert["rule"]["id"]
    msg['timestamp'] = alert["timestamp"]
    msg['id'] = alert['id']
    msg["all_fields"] = alert

    #msg['fields'] = []
    #    msg['fields'].append({
          #        "title": "Agent",
          #        "value": "({0}) - {1}".format(
          #            alert['agent']['id'],
          #            alert['agent']['name']
         #        ),
               #    })
                   #if 'agentless' in alert:
                       #    msg['fields'].append({
                             #        "title": "Agentless Host",
                             #        "value": alert['agentless']['host'],
                             #    })
                        
                            #msg['fields'].append({"title": "Location", "value": alert['location']})
                            #msg['fields'].append({
                                  #    "title": "Rule ID",
                                  #    "value": "{0} _(Level {1})_".format(alert['rule']['id'], level),
                                  #})
                        
                            #attach = {'attachments': [msg]}
                        
                            return json.dumps(msg)
                        
                        
                        def send_msg(msg, url):
                            debug("# In send msg")
                            headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
                            res = requests.post(url, data=msg, headers=headers, verify=False)
                            debug("# After send msg: %s" % res)
                        
                        
                        if __name__ == "__main__":
                            try:
                                    # Read arguments
                                            bad_arguments = False
                                if len(sys.argv) >= 4:
                                            msg = '{0} {1} {2} {3} {4}'.format(
                                                              now,
                                                              sys.argv[1],
                                                              sys.argv[2],
                                                              sys.argv[3],
                                                              sys.argv[4] if len(sys.argv) > 4 else '',
                                                          )
                                                                      #debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
                                                                                  debug_enabled = True
                                else:
                                            msg = '{0} Wrong arguments'.format(now)
                                    bad_arguments = True
                        
                                # Logging the call
                                        try:
                                                    f = open(log_file, 'a')
                                except:
                                    f = open(log_file, 'w+')
                                    f.write("")
                                    f.close()
                        
                                f = open(log_file, 'a')
                                f.write(msg + '\n')
                                f.close()
                        
                                if bad_arguments:
                                              debug("# Exiting: Bad arguments. Inputted: %s" % sys.argv)
                                    sys.exit(1)
                        
                                # Main function
                                main(sys.argv)
                        
                            except Exception as e:
                                debug(str(e))
                                raise
                                


还有这个名为 shuffle-integration 的 bash 脚本

#!/bin/sh
# Created by Shuffle, AS. <frikky@shuffler.io>.

WPYTHON_BIN="framework/python/bin/python3"

SCRIPT_PATH_NAME="$0"

DIR_NAME="$(cd $(dirname ${SCRIPT_PATH_NAME}); pwd -P)"
SCRIPT_NAME="$(basename ${SCRIPT_PATH_NAME})"

case ${DIR_NAME} in
      */active-response/bin | */wodles*)
        if [ -z "${WAZUH_PATH}" ]; then
                    WAZUH_PATH="$(cd ${DIR_NAME}/../..; pwd)"
        fi
        
                PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
    ;;
    */bin)
        if [ -z "${WAZUH_PATH}" ]; then
                    WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
        fi
        
                PYTHON_SCRIPT="${WAZUH_PATH}/framework/scripts/${SCRIPT_NAME}.py"
    ;;
     */integrations)
        if [ -z "${WAZUH_PATH}" ]; then
                    WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
        fi
        
                PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
    ;;
esac${WAZUH_PATH}/${WPYTHON_BIN} ${PYTHON_SCRIPT} "$@"


现在,我们需要更改文件的权限和所有者

chown root:ossec shuffle-integration*

chmod 750 shuffle-integration*

现在一切都配置好了,让我们用以下命令重启 Wazuh

systemctl restart wazuh-manager

我们可以通过生成事件(即攻击包含 wazuh-agent 的受监控系统)来测试集成是否正常工作,也可以重启 wazuh-agent,然后检查洗牌工作流中的执行情况。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表