0

0

【AI达人特训营】基于飞桨实现系统认证风险预测-异常检测

P粉084495128

P粉084495128

发布时间:2025-08-01 17:14:53

|

717人浏览过

|

来源于php中文网

原创

该项目基于百度飞桨实现系统认证风险预测的异常检测。先对用户认证行为数据处理,提取特征如时间差、不同维度的统计量等。构建含多层全连接层的分类模型,用kl_loss损失函数,经训练和验证,评估模型性能,最终生成预测结果用于判断用户认证行为是否有风险。

☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

【ai达人特训营】基于飞桨实现系统认证风险预测-异常检测 - php中文网

【AI达人特训营】系统认证风险预测-异常检测

本项目基于百度飞桨PaddlePaddle实现系统认证风险预测-异常检测,构造特征,修改模型,提高线下f1分数。

一、赛题背景

随着国家、企业对安全和效率越来越重视,作为安全基础设施之一的统一身份管理(IAM,Identity and Access Management)也得到越来越多的关注。 在IAM领域中,其主要安全防护手段是身份鉴别,既我们常见的口令验证、扫码验证、指纹验证等。它们一般分为三类,既用户所知(如口令)、所有(如身份证)、特征(如人脸)。但这些身份鉴别方式都有其各自的优缺点,比如口令,强度高了记不住,强度低了容易丢,又比如人脸,摇头晃脑做活体检测体验不好,静默检测如果算法不够,又很容易被照片、视频、人脸模型绕过。也因此,在等保2.0中对于三级以上系统要求必须使用两种以上的身份鉴别方式进行身份验证,来提高身份鉴别的可信度,这也被成为双因素认证。

但这对用户来说虽然在一定程度提高了安全性,但极大的降低了用户体验。也因此,IAM厂商开始参考UEBA、用户画像等行为分析技术,来探索一种既能确保用户体验,又能提高身份鉴别强度的方法。而在当前IAM的探索进程当中,目前最为可具落地的方法,是基于规则的行为分析技术,因为它的可理解性很高,且很容易与身份鉴别技术进行联动。

但基于规则但局限性也很明显,它是基于经验的,有宁错杀一千,不放过一个的特点,缺少从数据层面来证明是否有人正在尝试窃取或验证非法获取的身份信息,又或者正在使用窃取的身份信息,以此来提前进行风险预警和处置。

更多内容请前往比赛官网查看:系统认证风险预测-异常检测

比赛任务

本赛题中,参赛团队需要基于用户认证行为数据及风险异常标记结构,构建用户认证行为特征模型和风险异常评估模型,利用风险评估模型去判断当前用户认证行为是否存在风险。

Fish Audio
Fish Audio

为所有人准备的音频 AI

下载
  • 利用用户认证数据构建行为基线
  • 采用监督学习模型,基于用户认证行为特征,构建风险异常评估模型,判断当前用户认证行为是否存在风险

二、数据分析及处理

比赛数据是从竹云的风险分析产品日志库中摘录而来,主要涉及认证日志与风险日志数据。比赛数据经过数据脱敏和数据筛选等安全处理操作,供大家使用。其中认证日志是用户在访问应用系统产生的行为数据,包括登录、单点登录、退出等行为。

该比赛使用的数据已上传至AI Studio:https://aistudio.baidu.com/aistudio/datasetdetail/112151

In [1]
!pip uninstall paddlepaddle -y
   
In [2]
!pip install paddlepaddle
   

数据概况

In [3]
import pandas as pdimport numpy as numpyimport json

train = pd.read_csv('data/data112151/train_dataset.csv', sep='\t')
test = pd.read_csv('data/data112151/test_dataset.csv', sep='\t')
data = pd.concat([train, test])
   
In [4]
index = 0data['ii'] = data['session_id'].apply(lambda x: int(x[-7:-5]))
data['location_first_lvl'] = data['location'].astype(str).apply(lambda x: json.loads(x)['first_lvl'])
data['location_sec_lvl'] = data['location'].astype(str).apply(lambda x: json.loads(x)['sec_lvl'])
data['location_third_lvl'] = data['location'].astype(str).apply(lambda x: json.loads(x)['third_lvl'])
data.drop(['client_type', 'browser_source'], axis=1, inplace=True)
data['auth_type'].fillna('1', inplace=True)
   
In [5]
from tqdm import tqdmfrom sklearn.preprocessing import LabelEncoderfor col in tqdm(['user_name', 'action', 'auth_type', 'ip',                 'ip_location_type_keyword', 'ip_risk_level', 'location', 'device_model',                 'os_type', 'os_version', 'browser_type', 'browser_version',                 'bus_system_code', 'op_target', 'location_first_lvl', 'location_sec_lvl',                 'location_third_lvl']):
    lbl = LabelEncoder()
    data[col] = lbl.fit_transform(data[col])
   
In [6]
import numpy as np
data['op_date'] = pd.to_datetime(data['op_date'])

data['year'] = data['op_date'].dt.year
data['month'] = data['op_date'].dt.month
data['day'] = data['op_date'].dt.day
data['hour'] = data['op_date'].dt.hour

data['op_ts'] = data["op_date"].values.astype(np.int64) // 10 ** 9data = data.sort_values(by=['user_name', 'op_ts']).reset_index(drop=True)
data['last_ts'] = data.groupby(['user_name'])['op_ts'].shift(1)
data['ts_diff1'] = data['op_ts'] - data['last_ts']
   
In [7]
for f in ['ip', 'location', 'device_model', 'os_version', 'browser_version']:
    data[f'user_{f}_nunique'] = data.groupby(['user_name'])[f].transform('nunique')for method in ['mean', 'max', 'min', 'std', 'sum', 'median']:    for col in ['user_name', 'ip', 'location', 'device_model', 'os_version', 'browser_version']:
        data[f'ts_diff1_{method}_' + str(col)] = data.groupby(col)['ts_diff1'].transform(method)
   
In [8]
group_list = ['user_name','ip', 'location', 'device_model', 'os_version', 'browser_version', 'op_target']
   
In [9]
cat_cols = ['action', 'auth_type', 'browser_type',            'browser_version', 'bus_system_code', 'device_model',            'ip', 'ip_location_type_keyword', 'ip_risk_level', 'location', 'op_target',            'os_type', 'os_version', 'user_name'
            ]
   
In [10]
data.drop(columns = 'session_id', inplace=True)
   
In [11]
data.drop(columns='op_date',inplace=True)
   
In [12]
features = [item for item in data.columns if item != 'risk_label']
traindata = data[~data['risk_label'].isnull()].reset_index(drop=True)
traindata = traindata.fillna(0)
testdata = data[data['risk_label'].isnull()].reset_index(drop=True)for item in features:
    testdata[item] = testdata[item].fillna(0)
   
In [13]
data_X = traindata[features].values[:12000]
data_Y = traindata['risk_label'].values[:12000].astype(int).reshape(-1, 1)
data_X_test = traindata[features].values[12000:]
data_Y_test = traindata['risk_label'].values[12000:].astype(int).reshape(-1, 1)
testdata = testdata[features].values# 归一化from sklearn.preprocessing import MinMaxScaler
mm = MinMaxScaler()
data_X = mm.fit_transform(data_X)
data_X_test = mm.fit_transform(data_X_test)
testdata = mm.fit_transform(testdata)
   
In [14]
testdata.shape
   

三、模型组网

使用飞桨PaddlePaddle进行组网,在本基线系统中,只使用两层全连接层完成分类任务。

In [15]
import randomimport paddle

seed = 1234# 设置随机种子 固定结果def set_seed(seed):
    np.random.seed(seed)
    random.seed(seed)
    paddle.seed(seed)

set_seed(seed)
   
In [16]
import paddleimport paddle.nn as nn# 定义动态图class Classification(paddle.nn.Layer):
    def __init__(self):
        super(Classification, self).__init__()
        nn.Tanh
        self.drop = paddle.nn.Dropout(p=0.2)        # 定义一层全连接层,输出维度是2,激活函数为None,即不使用激活函数
        self.fc1 = paddle.nn.Linear(66, 64)
        self.fc2 = paddle.nn.Linear(64, 32)
        self.fc3 = paddle.nn.Linear(32, 16)
        self.fc4 = paddle.nn.Linear(16, 8)
        self.fc5= paddle.nn.Linear(8, 2)
        self.Tanh = nn.Tanh()    
    # 网络的前向计算函数
    def forward(self, inputs):
        x = self.Tanh(self.fc1(inputs))
        x = self.drop(x)
        x = self.Tanh(self.fc2(x))
        x = self.drop(x)
        x = self.Tanh(self.fc3(x))
        x = self.drop(x)
        x = self.Tanh(self.fc4(x))
        x = self.drop(x)
        pred = self.fc5(x)        return pred
   

四、配置参数及训练

记录日志

In [17]
# 定义绘制训练过程的损失值变化趋势的方法draw_train_processtrain_nums = []
train_costs = []def draw_train_process(iters,train_costs):
    title="training cost"
    plt.title(title, fontsize=24)
    plt.xlabel("iter", fontsize=14)
    plt.ylabel("cost", fontsize=14)
    plt.plot(iters, train_costs,color='red',label='training cost') 
    plt.grid()
    plt.show()
   

定义损失函数

损失函数使用【R-Drop:摘下SOTA的Dropout正则化策略】里的kl_loss:

In [36]
import paddleimport paddle.nn.functional as Fclass kl_loss(paddle.nn.Layer):
    def __init__(self):
       super(kl_loss, self).__init__()    def forward(self, p, q, label):
        ce_loss = 0.5 * (F.mse_loss(p, label=label)) + F.mse_loss(q, label=label)
        kl_loss = self.compute_kl_loss(p, q)        # carefully choose hyper-parameters
        loss = ce_loss + 0.3 * kl_loss 

        return loss    def compute_kl_loss(self, p, q):
        
        p_loss = F.kl_div(F.log_softmax(p, axis=-1), F.softmax(q, axis=-1), reduction='none')
        q_loss = F.kl_div(F.log_softmax(q, axis=-1), F.softmax(p, axis=-1), reduction='none')        # You can choose whether to use function "sum" and "mean" depending on your task
        p_loss = p_loss.sum()
        q_loss = q_loss.sum()

        loss = (p_loss + q_loss) / 2

        return loss
   

模型训练

In [37]
import paddle.nn.functional as F
y_preds = []
labels_list = []
BATCH_SIZE = 8train_data = data_X
train_data_y = data_Y
test_data = data_X_test
test_data_y = data_Y_test



compute_kl_loss = kl_loss()def train(model):
    print('start training ... ')    # 开启模型训练模式
    model.train()
    EPOCH_NUM = 5
    train_num = 0
    scheduler = paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=0.001, T_max=int(traindata.shape[0]/BATCH_SIZE*EPOCH_NUM), verbose=False)
    optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())    for epoch_id in range(EPOCH_NUM):        # 在每轮迭代开始之前,将训练数据的顺序随机的打乱
        np.random.shuffle(train_data)        # 将训练数据进行拆分,每个batch包含50条数据
        mini_batches = [np.append(train_data[k:k+BATCH_SIZE],train_data_y[k:k+BATCH_SIZE],axis=1)for k in range(0, len(train_data))]        for batch_id, data in enumerate(mini_batches):
            features_np = np.array(data[:, :66], np.float32)
            labels_np = np.array(data[:, -1:], np.float32)
        
            features = paddle.to_tensor(features_np)
            labels = paddle.to_tensor(labels_np)            
            # m = labels.flatten(0)
            # m = np.array(m)
        
            #前向计算
            y_pred1 = model(features)            # x= paddle.argmax(y_pred1,axis=-1)
            # x = np.array(x)
            # acc = acc+np.sum(m == x)
            # print(acc)
            
            y_pred2 = model(features)
            cost = compute_kl_loss(y_pred1, y_pred2, label=labels)            # cost = F.mse_loss(y_pred, label=labels)
            train_cost = cost.numpy()[0]            #反向传播
            cost.backward()            #最小化loss,更新参数
            optimizer.step()            # 清除梯度
            optimizer.clear_grad()            if batch_id % 500 == 0 and epoch_id % 1 == 0:                print("Pass:%d,Cost:%0.5f"%(epoch_id, train_cost))

            train_num = train_num + BATCH_SIZE
            train_nums.append(train_num)
            train_costs.append(train_cost)
model = Classification()
train(model)
   
In [38]
def predict(model):
    print('start predicting')
    model.eval()
    outputs = []
    mini_batches = [np.append(test_data[k:k+BATCH_SIZE],test_data_y[k:k+BATCH_SIZE],axis=1)for k in range(0,len(test_data),BATCH_SIZE)]    for data in mini_batches:
        features_np = np.array(data[:,:66],np.float32)
        features = paddle.to_tensor(features_np)
        pred = model(features)
        out = paddle.argmax(pred,axis=1)
        outputs.extend(out.numpy())    return outputs
outputs = predict(model)
   
In [39]
test_data_y = test_data_y.reshape(-1,)
outputs = np.array(outputs)
np.sum(test_data_y==outputs)/test_data_y.shape[0]
   
In [44]
from sklearn.metrics import f1_score
x = f1_score(test_data_y,outputs)
x
   

绘制趋势曲线

In [40]
import matplotlibimport matplotlib.pyplot as pltimport warnings
warnings.filterwarnings('ignore')
%matplotlib inline

draw_train_process(train_nums, train_costs)
   

五、保存预测结果

模型预测:

In [41]
predict_result = []for infer_feature in testdata:
    infer_feature = infer_feature.reshape(1,66)
    infer_feature = paddle.to_tensor(np.array(infer_feature, dtype='float32'))
    result = model(infer_feature)
    predict_result.append(result)
   

将结果写入.CSV文件中:

In [42]
import osimport pandas as pd

id_list = [item for item in range(1, 10001)]
label_list = []
csv_file = 'submission.csv'for item in range(len(id_list)):
    label = np.argmax(predict_result[item])
    label_list.append(label)

data = {'id':id_list, 'ret':label_list}
df = pd.DataFrame(data)
df.to_csv(csv_file, index=False, encoding='utf8')
   

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
PHP高性能API设计与Laravel服务架构实践
PHP高性能API设计与Laravel服务架构实践

本专题围绕 PHP 在现代 Web 后端开发中的高性能实践展开,重点讲解基于 Laravel 框架构建可扩展 API 服务的核心方法。内容涵盖路由与中间件机制、服务容器与依赖注入、接口版本管理、缓存策略设计以及队列异步处理方案。同时结合高并发场景,深入分析性能瓶颈定位与优化思路,帮助开发者构建稳定、高效、易维护的 PHP 后端服务体系。

5

2026.03.04

AI安装教程大全
AI安装教程大全

2026最全AI工具安装教程专题:包含各版本AI绘图、AI视频、智能办公软件的本地化部署手册。全篇零基础友好,附带最新模型下载地址、一键安装脚本及常见报错修复方案。每日更新,收藏这一篇就够了,让AI安装不再报错!

12

2026.03.04

Swift iOS架构设计与MVVM模式实战
Swift iOS架构设计与MVVM模式实战

本专题聚焦 Swift 在 iOS 应用架构设计中的实践,系统讲解 MVVM 模式的核心思想、数据绑定机制、模块拆分策略以及组件化开发方法。内容涵盖网络层封装、状态管理、依赖注入与性能优化技巧。通过完整项目案例,帮助开发者构建结构清晰、可维护性强的 iOS 应用架构体系。

33

2026.03.03

C++高性能网络编程与Reactor模型实践
C++高性能网络编程与Reactor模型实践

本专题围绕 C++ 在高性能网络服务开发中的应用展开,深入讲解 Socket 编程、多路复用机制、Reactor 模型设计原理以及线程池协作策略。内容涵盖 epoll 实现机制、内存管理优化、连接管理策略与高并发场景下的性能调优方法。通过构建高并发网络服务器实战案例,帮助开发者掌握 C++ 在底层系统与网络通信领域的核心技术。

25

2026.03.03

Golang 测试体系与代码质量保障:工程级可靠性建设
Golang 测试体系与代码质量保障:工程级可靠性建设

Go语言测试体系与代码质量保障聚焦于构建工程级可靠性系统。本专题深入解析Go的测试工具链(如go test)、单元测试、集成测试及端到端测试实践,结合代码覆盖率分析、静态代码扫描(如go vet)和动态分析工具,建立全链路质量监控机制。通过自动化测试框架、持续集成(CI)流水线配置及代码审查规范,实现测试用例管理、缺陷追踪与质量门禁控制,确保代码健壮性与可维护性,为高可靠性工程系统提供质量保障。

77

2026.02.28

Golang 工程化架构设计:可维护与可演进系统构建
Golang 工程化架构设计:可维护与可演进系统构建

Go语言工程化架构设计专注于构建高可维护性、可演进的企业级系统。本专题深入探讨Go项目的目录结构设计、模块划分、依赖管理等核心架构原则,涵盖微服务架构、领域驱动设计(DDD)在Go中的实践应用。通过实战案例解析接口抽象、错误处理、配置管理、日志监控等关键工程化技术,帮助开发者掌握构建稳定、可扩展Go应用的最佳实践方法。

60

2026.02.28

Golang 性能分析与运行时机制:构建高性能程序
Golang 性能分析与运行时机制:构建高性能程序

Go语言以其高效的并发模型和优异的性能表现广泛应用于高并发、高性能场景。其运行时机制包括 Goroutine 调度、内存管理、垃圾回收等方面,深入理解这些机制有助于编写更高效稳定的程序。本专题将系统讲解 Golang 的性能分析工具使用、常见性能瓶颈定位及优化策略,并结合实际案例剖析 Go 程序的运行时行为,帮助开发者掌握构建高性能应用的关键技能。

48

2026.02.28

Golang 并发编程模型与工程实践:从语言特性到系统性能
Golang 并发编程模型与工程实践:从语言特性到系统性能

本专题系统讲解 Golang 并发编程模型,从语言级特性出发,深入理解 goroutine、channel 与调度机制。结合工程实践,分析并发设计模式、性能瓶颈与资源控制策略,帮助将并发能力有效转化为稳定、可扩展的系统性能优势。

26

2026.02.27

Golang 高级特性与最佳实践:提升代码艺术
Golang 高级特性与最佳实践:提升代码艺术

本专题深入剖析 Golang 的高级特性与工程级最佳实践,涵盖并发模型、内存管理、接口设计与错误处理策略。通过真实场景与代码对比,引导从“可运行”走向“高质量”,帮助构建高性能、可扩展、易维护的优雅 Go 代码体系。

20

2026.02.27

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
SQL 教程
SQL 教程

共61课时 | 4.2万人学习

10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号