侧边栏壁纸
博主头像
OUCOS运维技术博客 博主等级

行动起来,活在当下

  • 累计撰写 11 篇文章
  • 累计创建 10 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

高可用实战项目: HFish蜜罐+Flask框架联动核心交换机封禁MAC/IP

OUCOS
2022-11-09 / 0 评论 / 0 点赞 / 195 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于2024-10-07,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

前言

最近在对公司网络重构的思考中,发现内网安全性的不足,为了提高内网安全性,所以决定采用HFish蜜罐系统+Flask框架中间件实现高效内网封禁机制,可以在恶意流量触发蜜罐时及时封禁,防止进一步扩散;

原理:HFish通过自定义webhook告警传参攻击者IP到Flask中间件,中间件判断攻击者IP如10分钟内攻击次数是否满足条件,防止误判,满足条件则封禁MAC/IP地址

蜜罐联动流量图

如果是内网IP,通过核心交换机arp获取MAC地址,直接使用MAC黑洞封禁

如果是外网IP,核心交换机arp获取不到MAC地址,则直接全局ACL封禁IP

本案例以H3C交换机配置命令配置,可根据对应品牌重构telnet语句即可

注:本项目只适合无成本想提升内网安全,肯定比不上企业级的终端安全系统和流量检测平台等功能,但是也有效提升内网安全性

链接

HFish官方文档

Webhook告警参数文档

技术栈

服务

HFish== 3.3.0

Python== 3.8.13

Mysql== 5.7

Docker== 20.10.17

docker-compose== 1.25.1

Python依赖包

Flask== 2.2.2

sqlalchemy== 1.4.42

pymysql== 1.0.2

前期准备

Mysql部署

Mysql部署用于提供给HFish蜜罐系统使用外部数据库,且开启外网访问,用于Flask框架中间件调用数据库获取攻击统计次数

本页面不提供Mysql部署教程

HFish部署

本页面不提供HFish部署教程,直接访问官方文档根据需求选择部署方式即可

HFish官方部署文档:[https://hfish.net/#/2-0-deploy](https://hfish.net/#/2-0-deploy)

使用外部Mysql数据库

HFish蜜罐系统安装完毕后请选择更换数据库,使用外部Mysql数据库,且确保数据库外部可访问

20221109090841

20221109090804

Flask项目案例

安装依赖包

pip install Flask==2.2.2

pip install sqlalchemy==1.4.42

pip install pymysql==1.0.2

Flask框架项目

本项目直接用Pycharm新建Flask框架,会自动生成app.py文件

目录结构

—static/

—templates/

—app.py

—we.py

—Dockerfile

—requirements.txt

—supervisor_flask.conf

app.py

本案例以H3C交换机命令为例,具体请参考对应的交换机品牌命令即可

案例中acl 3010是全局ACL,用来封禁出入IP流量

from flask import Flask, request, abort

from sqlalchemy import create_engine

from we import TelnetClient

app = Flask(__name__)

# HFish数据库信息

DB_URL = 'mysql+pymysql://账号:密码@数据库IP:端口/库名'

engine = create_engine(DB_URL)

# H3C交换机信息

ip_add = '核心交换机IP地址'

username = '登录账号'

password = '登录密码'

commands = []

ip_mac = ''

ip_vlan = ''

# HFish访问接口白名单

ALLOWED_IPS = ['HFish白名单IP']

# 内网禁用MAC地址网段

MAC_IPS = ['内网网段']

@app.errorhandler(403)

def permission_error(e):

    return e

@app.before_request

def limit_remote_addr():

    client_ip = str(request.remote_addr)

    valid = False

    for ip in ALLOWED_IPS:

        if client_ip.startswith(ip) or client_ip == ip:

            valid = True

            break

    if not valid:

        abort(403)

@app.route('/', methods=['post'])

def index():

    global ip_mac, ip_vlan

    try:

        json_data = request.json

        print(json_data, flush=True)

        src_ip = json_data.get('src_ip')

        source_ip = "'" + src_ip + "'"

        if source_ip is not None:

            sql = 'SELECT count(*) FROM infos WHERE source_ip=%s AND create_time > NOW()-interval 10 minute;' % source_ip

            print(sql, flush=True)

            conn = engine.connect()

            rs = conn.execute(sql)

            ip_rs = rs.fetchone()[0]

            print(src_ip, flush=True)

            print(ip_rs, flush=True)

            # 统计次数触发

            if ip_rs > 2:

                # 内网网段禁用mac

                for ip in MAC_IPS:

                    if src_ip.startswith(ip) or src_ip == ip:

                        tc = TelnetClient()

                        if tc.Login_sw(ip_add, username, password):

                            ip_mac, ip_vlan = tc.MAC_sw(src_ip)

                            print(ip_mac, ip_vlan, flush=True)

                            tc.Logout_sw()

                        comm = 'mac-address blackhole %s vlan %s' % (ip_mac, ip_vlan)

                        print(comm, flush=True)

                        commands.append(comm)

                        tc.Config_sw(commands)

                        tc.Logout_sw()

                        break

                # 匹配不到mac地址时封禁IP

                if ip_mac == '':

                    comm_out = 'rule deny ip source %s 0' % src_ip

                    comm_in = 'rule deny ip destination %s 0' % src_ip

                    commands.append('acl number 3010')

                    commands.append(comm_out)

                    commands.append(comm_in)

                    print(commands, flush=True)

                    tc = TelnetClient()

                    if tc.Login_sw(ip_add, username, password):

                        tc.Config_sw(commands)

                        tc.Logout_sw()

    except:

        return jsonify({"message": -1})

    return jsonify({"message": 0})

if name == '__main__':

    app.run()

we.py

主要负责调用telnet模块,直接访问核心交换机获取MAC地址和执行相关命令

from telnetlib import Telnet

from time import sleep

import re

class TelnetClient():

    def init(self):

        self.tn = Telnet()

    # 登录交换机

    def Login_sw(self, ip_add, username, password):

        # 连接交换机

        try:

            self.tn.open(ip_add, port=23)

            # self.tn.set_debuglevel(2)

        except:

            print(f'{ip_add}:网络连接失败!', flush=True)

            return False

        # login输入用户名与密码

        self.tn.read_until(b'Login: ', timeout=2)

        self.tn.write(username.encode('ascii') + b'\n')

        self.tn.read_until(b'Password: ', timeout=2)

        self.tn.write(password.encode('ascii') + b'\n')

        # 延迟2秒后,返回登录结果

        sleep(2)

        login_result = self.tn.read_very_eager().decode('ascii')

        # print(login_result)

        # print(login_result.replace("\r", ""))[1]

        # 判断登录是否异常

        if "AAA authentication failed" in login_result:

            print(f'{ip_add}:登陆失败!', flush=True)

            return False

        else:

            print(f'{ip_add}:登陆成功!', flush=True)

            return True

    # 获取MAC地址

    def MAC_sw(self, src_ip):

        self.tn.write(b'sys\n')

        mac_commands = 'display arp %s' % src_ip

        print(mac_commands, flush=True)

        self.tn.write(mac_commands.encode('ascii') + b'\n')

        a = []

        # 获取命令执行结果

        b, c, d = self.tn.expect(a, timeout=1)

        print(d, flush=True)

        # 匹配mac地址

        pat = re.compile(r'[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}')

        # print(pat.findall(str(d)))

        ip_mac = pat.findall(str(d))[0]

        # 获取对应的vlan

        ip_vlan = str(d).split(ip_mac + ' ')[-1].split(' ')[0]

        # print(ip_vlan)

        return ip_mac, ip_vlan

    # 配置交换机

    def Config_sw(self, commands):

        self.tn.write(b'sys\n')

        for command in commands:

            self.tn.write(command.encode('ascii') + b'\n')

        sleep(2)

        output = self.tn.read_very_eager().decode('ascii')

        print(output, flush=True)

    # 退出

    def Logout_sw(self):

        self.tn.write(b'quit\n')

        print(self.tn.read_very_eager().decode('ascii'), flush=True)

Docker封装

supervisor_flask.conf【进程管理】

[supervisord]

nodaemon=true

[program:flaskweb]

command=gunicorn -w 5 -b 0.0.0.0:5000 app:app

directory=/src

autostart=true

autorestart=true

redirect_stderr=true

stdout_logfile=/dev/stdout

gunicorn -w 5 -b 0.0.0.0:5000 app:app

-w 5 表示工作进程数

app:app表示使用app.py启动文件中的app实例,请确认app = Flask(name)

Dockerfile

FROM python:3.7-slim

WORKDIR /src

COPY . /src

# 时区

ENV TZ=Asia/Shanghai

RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

# 替换源

RUN sed -i s/deb.debian.org/mirrors.aliyun.com/g /etc/apt/sources.list \

    && sed -i s/security.debian.org/mirrors.aliyun.com/g /etc/apt/sources.list \

    && apt-get update \

    && apt-get install -y default-libmysqlclient-dev gcc \

    && apt-get clean

RUN mkdir -p ~/.pip && mkdir -p /etc/supervisor

# pip源

RUN echo "[global]" >> ~/.pip/pip.conf \

    && echo "trusted-host=mirrors.aliyun.com" >> ~/.pip/pip.conf \

    && echo "index-url=http://mirrors.aliyun.com/pypi/simple/" >> ~/.pip/pip.conf

# supervisor_conf 进程管理

RUN pip install supervisor flask gunicorn

RUN /usr/local/bin/echo_supervisord_conf > /etc/supervisor/supervisord.conf \

    && echo "[include]">>/etc/supervisor/supervisord.conf \

    && echo "files = /etc/supervisor/*.conf">>/etc/supervisor/supervisord.conf

# 拷贝项目进程文件

COPY ./supervisor_flask.conf /etc/supervisor

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

#EXPOSE 5000

CMD ["supervisord","-n","-c","/etc/supervisor/supervisord.conf"]

requirement.txt

终端执行生成项目依赖包文件

注意请检查里面是否存在本地路径,如有本地路径请直接清除后缀路径即可

click

colorama

Flask

Flask-SQLAlchemy==3.0.2

greenlet==2.0.0

importlib-metadata

inflect==6.0.2

itsdangerous

Jinja2

MarkupSafe

pydantic==1.10.2

PyMySQL==1.0.2

sqlacodegen==2.3.0

SQLAlchemy==1.4.42

typing_extensions==4.4.0

Werkzeug

zipp

构建docker image镜像

请确保目录结构已存在

—static/

—templates/

—app.py

—we.py

—Dockerfile

—requirements.txt

—supervisor_flask.conf

# 在项目目录中执行Docker镜像构建命令

docker build -t hfish-webhook:v1 .

镜像名:`hfish-webhook`

镜像版本:`v1`

docker-compose.yml管理docker

使用docker-compose方便管理docker容器启动和关闭

version: '2'

services:

  hfish-webhook:

    image: hfish-webhook:v1

    container_name: hfish-webhook

    restart: always

    ports:

      - 5000:5000

image: hfish-webhook:v1 使用镜像`hfish-webhook:v1`

container_name: hfish-webhook 容器名称` hfish-webhook`

ports 为映射端口

5000:5000即将宿主机5000端口映射为容器的5000端口

启动docker容器

# 需要在docker-compose.yml文件目录下执行

docker-compose up -d hfish-webhook   # 启动名为hfish-webhook容器

docker-compose stop hfish-webhook   # 关闭名为hfish-webhook容器

docker-compose rm -f hfish-webhook  # 删除名为hfish-webhook容器

20221109100635

HFish设置

通知配置自定义webhook

当部署完Flask框架中间件后,将中间件映射的IP+端口作为自定义webhook地址即可

打开HFish页面---`系统配置`---`通知配置`---`通知配置`下拉找到`Webhook配置`

点击`+`号添加,类型选择 自定义,输入Flask框架中间件地址

20221109134057

配置告警策略联动中间件

设置号自定义webhook后,新增告警策略,将所需攻击行为列入告警触发即可

打开HFish页面---`系统配置`---`通知配置`---`告警策略`点击`添加策略`

告警场景选择对应的内网/外网告警,此处默认全部

告警类型选择攻击行为,表示只要触发攻击行为就会触发中间件webhook

通知方式选择`webhook`,并将中间件的自定义webhook勾选即可

20221109134710

效果测试

测试访问HFish蜜罐系统`远程桌面服务`,触发`445`端口蜜罐告警,10分钟内触发3次告警,Flask框架中间件判断符合条件,将获取攻击IP的mac地址,并列入MAC黑洞阻断所有流量

开启445蜜罐

20221109135707

模拟攻击445蜜罐

20221109140121

查看效果

20221109140635

20221109140755

0

评论区