使用Agent智能体快速实现B站大会员活动消息推送

起因:非常凑巧的是刚好自己是b站十年大会员,又非常巧的是b站有大会员的线下观影活动,又非常巧的是十年大会员或百年大会员有百分之十的名额获取资格,由于平时也只是休息时间或者晚上才会打开b站看看up主的视频和b站的活动,前段时间发现现在的各大平台都在做Agent智能体的应用,于是就有了这个想法,我是不是可以借助AI快速实现我需要手动去做的这件事?

说干就干,因为之前做过微信公众号消息推送的功能,所以推送的逻辑可以参考之前的[使用python实现微信公众号定时推送消息]这篇文章,剩下的部分咱们一步一步来。

首先分解任务:


1.获取页面信息:获取b站大会员页正在进行的活动信息

2.解析页面内容:获取响应内容中的enroll_detail中的city 如果包含所在城市则将获取到的记录存入表中

3.根据表中记录的数据进行定时推送:如果从表中扫描到了数据(条件为表中的是否读取标识为未读),将读取到的数据推送到微信中

4.表设计:大概需要知道是什么活动,活动地址是什么,活动地点活动时间这些信息

第一步:创建数据库与数据库表:

这些信息是展示在微信公众号的推送消息中的,因为我这里使用的是测试号,所以不能打开链接或者点击查看信息,所以对于我来说最重要的信息是什么活动,活动地址和活动时间是什么就可以了,到时候只要推送给我我去b站处理信息即可(当然也保留了活动地址,之前是想直接点击活动地址跳转到b站,但是后面发现测试公众号不支持点击推送内容,测试号仅为做测试使用),这里我觉得没有问题,当然你可以做成自动化的跳转后自动填写表单报名,但是这就失去了公平性,不确保这么干的后果包括不限于封号,引起后台风控于模块安全升级等,所以我建议各位不要这么干,做什么都要有个度,不要做的太过了。然后就是需要一个标识,在消息推送后将标识改成已读,这样在下一次扫描时就不会重复推送已经推送过的消息。

下面是建表语句:

CREATE TABLE movie_records (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT,               -- 活动标题
    city TEXT,                -- 活动城市
    start_time TEXT,          -- 开始时间
    end_time TEXT,            -- 结束时间
    address TEXT,             -- 活动地址
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,  -- 创建时间
    is_read INTEGER DEFAULT 0 -- 是否已推送(0:未推送,1:已推送)
)

第二步,编写提示词:

需要告诉AI要做什么,从哪个地址获取信息,从地址中获取什么信息,获取哪里的信息,需要明确具体的告诉AI,然后要告知AI获取信息后需要怎么做,这里可以参考第三条。这里可以使用提前准备好需要用到的数据库与数据库表,这里有数据库与表后可以明确告诉AI将哪些数据存入哪个数据库中的哪个表中的哪些字段中(这里可以使用脱敏后的描述,比如可以对数据库的ip端口等信息进行脱敏,到后面手动修改),大概就是上面1,2,3这几步。

第三步,将编写好的提示词喂给AI:

我这里使用的是豆包的应用生成,其实其他的应该也差不多,之后就需要等待AI帮我们编写代码,一段时间后AI会生成第一版代码,大致主体方向没有问题,不仅是代码还编写了单测对代码进行了可用性测试以及README的编写,这个任务会自动分析使用最优的编程语言(这里使用的是python),生成的内容有:日志文件,README文件,主程序,配置文件,单元测试,样例文件以及执行脚本。

得到生成的代码后是不能够直接使用的,因为我们还需要集成推送功能,因为这个功能是现成的,所以没有必要让AI再次生成或者使用AI生成的那一套推送代码,因为现成的代码无需调试即可直接使用,节约时间和精力成本,在打开生成的代码后可以先对生成部分的代码进行测试,即使告诉了AI从什么地方获取数据,有些时候可能会因为我们的描述问题导致这个模型产生的结果会有部分偏差,但是不影响整理功能,对代码进行微调后我们就获取到了从b站大会员活动页获取到进行中的数据,然后又对数据库以及数据表进行初始化后的数据入库进行了测试,最终这套代码从有想法到最终实现并运行只花了不到两鲲天,并且就在本地测试稳定性的第一天就通过推送及时收到了活动消息,并成功参加了活动(因为收到消息就直接去b站查看了,最终是前几名参加的活动,所以根据十年大会员的规则是百分百获取到了参与资格),也是非常幸运的。

那么下面来拆解代码分段进行讲解:

首先是配置文件部分(config.py)

配置文件有:公众号配置,b站活动url,观影城市,数据库配置,大会员url请求头以及微信公众号请求头,定时任务配置相关(推送时间配置,获取数据时间配置),因为代码中都有注释所以不需要太多的解释,都是所见即所得的内容,直接修改即可。

# 公众号配置
# 公众号appId
APP_ID = "填写公众号appId"
# 公众号appSecret
APP_SECRET = "填写公众号appSecret"
# 模板消息id
TEMPLATE_ID = "填写模板消息id"
# 这是openid
USER = "填写openid"

# B站API URL
API_URL = "https://api.bilibili.com/x/vip/web/vip_center/offlinemeeting?csrf=b73e1ac90734545514b1b5398e4345502&t={}"
# B站观影城市
VIEW_CITY = '填写您所在的城市'

# 网络数据库配置(需根据实际环境修改)
DB_CONFIG = {
    "host": "192.168.1.1",  # 例如:localhost(本地网络)、192.168.1.1(局域网)、xxx.cloud.com(公网)
    "port": 3306,  # MySQL 默认端口
    "user": "root",  # 例如:root
    "password": "123456",  # 你的数据库密码
    "database": "bilibili_movies_db",  # 需提前创建好该数据库(例如:movie_db)
    "charset": "utf8mb4",  # 支持emoji等特殊字符
    "cursorclass": pymysql.cursors.DictCursor  # 可选:返回字典格式结果
}


# b站大会员观影请求头配置
BL_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Referer": "https://www.bilibili.com/",
    # 如果你需要登录后的数据,可以添加Cookie
    # "Cookie": "your_bilibili_cookie"
}

# 微信公众号请求头配置
WX_HEADERS = {
    'Content-Type': 'application/json',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                  'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'
}

# 定时任务配置
# 获取数据的时间间隔(分钟)
FETCH_INTERVAL_MINUTES = 60

# 检查并推送的时间间隔(分钟)
PUSH_INTERVAL_MINUTES = 5

然后就是主程序(main.py)

这部分在本地运行后没有问题,但是在服务器上运行时出现了字符集不一致的问题,在告诉AI问题后进行了优化

所以在开头先让代码强制使用utf-8:

sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')

对日志也强制使用utf-8:

# 强制使用UTF-8编码
def setup_logging():
    """配置日志,解决中文编码问题"""
    # 创建日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # 文件处理器(强制UTF-8编码)
    file_handler = logging.FileHandler("bilibili_notifier.log", encoding='utf-8')
    file_handler.setFormatter(formatter)
    file_handler.setLevel(logging.INFO)

    # 控制台处理器(解决中文输出问题)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    console_handler.setLevel(logging.INFO)

    # 配置根日志器
    logger = logging.getLogger("bilibili_movie_notifier")
    logger.setLevel(logging.INFO)
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger


# 初始化日志
logger = setup_logging()


# 兼容代码中的logger引用
logger = logging.getLogger("bilibili_movie_notifier")

将配置文件内容引入:

# 从配置文件导入配置项
from config import (
    API_URL,
    BL_HEADERS,
    WX_HEADERS,
    APP_ID,
    APP_SECRET,
    TEMPLATE_ID,
    USER,
    VIEW_CITY,
    DB_CONFIG,
    FETCH_INTERVAL_MINUTES,
    PUSH_INTERVAL_MINUTES
)

这里直接初始化数据库中的表结构,也就是说只需要提供一个能够接收数据的数据库即可,在运行主程序时会先帮你创建数据库表(如果已经存在则不会重复创建或者被覆盖),能看出对于程序可能存在的异常问题AI也帮我们进行了兜底。


def init_database():
    """初始化MySQL网络数据库和表结构"""
    conn = None
    try:
        # 连接网络MySQL数据库
        conn = pymysql.connect(**DB_CONFIG)
        cursor = conn.cursor()

        # 创建表(适配MySQL语法:AUTO_INCREMENT 无需加括号,TIMESTAMP默认值调整)
        cursor.execute('''
                       CREATE TABLE IF NOT EXISTS movie_records
                       (
                           id
                           INT
                           PRIMARY
                           KEY
                           AUTO_INCREMENT,
                           title
                           VARCHAR
                       (
                           255
                       ) NOT NULL, # MySQL推荐用VARCHAR指定长度
                           city VARCHAR
                       (
                           100
                       ) NOT NULL,
                           start_time VARCHAR
                       (
                           50
                       ) NOT NULL,
                           end_time VARCHAR
                       (
                           50
                       ) NOT NULL,
                           address VARCHAR
                       (
                           255
                       ) NOT NULL,
                           create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                           is_read TINYINT DEFAULT 0 # MySQL用TINYINT替代SQLite的INTEGER(更省空间)
                       ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
                       ''')

        conn.commit()
        logger.info("网络数据库初始化成功(表结构已创建/验证)")

    except pymysql.MySQLError as e:
        # 替换f-string为format方式,兼容更低版本Python3
        logger.error("网络数据库初始化失败: 错误码%s, 错误信息%s", e.args[0], e.args[1])
        if conn:
            conn.rollback()  # 出错时回滚
    except Exception as e:
        logger.error("未知错误: %s", str(e))
    finally:
        # 确保连接关闭
        if conn:
            conn.close()

获取b站大会员活动数据,这里会在url后拼接一个时间戳来确保链接的可用性(其实不加也不影响),当获取到数据后会返回响应数据。

def fetch_bilibili_data():
    """
    从B站API获取大会员电影信息
    返回: 响应的JSON数据
    """
    try:
        # 使用当前时间戳
        timestamp = int(time.time() * 1000)
        url = API_URL.format(timestamp)

        response = requests.get(url, headers=BL_HEADERS, timeout=10)
        response.raise_for_status()  # 检查请求是否成功

        logger.info("成功获取B站数据")
        return response.json()
    except requests.exceptions.RequestException as e:
        # 替换f-string为format方式
        logger.error("获取数据失败: %s", e)
        return None
    except Exception as e:
        logger.error("处理响应数据时出错: %s", e)
        return None

将获取的数据进行解析并获取我们需要的数据,由于后与数据落库是同步的,所以在获取数据时获取了数据库连接备用。


def parse_and_store_data(data):
    """
    解析数据并存储符合条件的记录
    参数: data - 从API获取的JSON数据
    """
    if not data or 'data' not in data:
        logger.warning("无效的数据格式")
        return

    try:
        conn = pymysql.connect(**DB_CONFIG)
        cursor = conn.cursor()

        new_records_count = 0

        # 解析正在进行的活动 (enroll_detail)
        if 'offline_meeting' in data['data'] and 'enroll_detail' in data['data']['offline_meeting']:
            enroll_detail = data['data']['offline_meeting']['enroll_detail']
            if enroll_detail and 'items' in enroll_detail:
                for item in enroll_detail['items']:
                    # 检查city是否包含您的所在城市
                    if 'city' in item and VIEW_CITY in item['city']:
                        # 提取需要的字段
                        title = item.get('title', '')
                        city = item.get('city', '')
                        # 将时间戳转换为日期时间字符串
                        start_time = datetime.fromtimestamp(item.get('start_time', 0)).strftime('%Y-%m-%d %H:%M:%S')
                        end_time = datetime.fromtimestamp(item.get('end_time', 0)).strftime('%Y-%m-%d %H:%M:%S')
                        # 获取活动地址
                        address = item.get('link', '')

组装获取到的数据进行数据落库,并在操作结束后关闭数据库连接,因为是个小型的项目,所以并不需要考虑性能问题,所以没有使用连接池。

                        # 检查是否已存在相同记录
                        cursor.execute("SELECT id FROM movie_records WHERE title=%s AND start_time=%s",
                                       (title, start_time))
                        if not cursor.fetchone():
                            # 插入新记录
                            cursor.execute('''
                                           INSERT INTO movie_records (title, city, start_time, end_time, address, is_read)
                                           VALUES (%s, %s, %s, %s, %s, %s)
                                           ''', (title, city, start_time, end_time, address, 0))
                            logger.info("已添加新记录: %s", title)
                            new_records_count += 1

            if new_records_count > 0:
                logger.info("共添加 %s 条新记录", new_records_count)
            else:
                logger.info("没有发现新的%s活动记录", VIEW_CITY)
        else:
            logger.warning("数据中没有找到enroll_detail字段")

        conn.commit()
        conn.close()
    except Exception as e:
        logger.error("解析和存储数据时出错: %s", e)

检查推送数据,这里的逻辑是,隔一段时间会扫描一下数据库表,开表中是否存在未推送的数据,如果存在则进行数据推送,当推送成功后将数据的推送状态改成已推送,微信公众号推送的代码在微信公众号定时推送消息的那篇文章中有,这里就不重复列举了,逻辑是一样的。


def check_and_push():
    """检查未读记录并推送"""
    try:
        conn = pymysql.connect(**DB_CONFIG)
        cursor = conn.cursor()

        # 查询未读记录
        cursor.execute("SELECT id, title, city, start_time, end_time, address FROM movie_records WHERE is_read=0")
        unread_records = cursor.fetchall()

        if unread_records:
            logger.info("发现 %s 条未读记录", len(unread_records))

            for record in unread_records:
                # 获取消息内容,第二个参数是默认值,key不存在时返回空字符串
                id = record.get('id', '')
                title = record.get('title', '')
                city = record.get('city', '')
                start_time = record.get('start_time', '')
                end_time = record.get('end_time', '')
                address = record.get('address', '')

                # 格式化时间
                try:
                    start_dt = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
                    end_dt = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
                    formatted_start = start_dt.strftime("%Y年%m月%d日 %H:%M")
                    formatted_end = end_dt.strftime("%Y年%m月%d日 %H:%M")
                except ValueError:
                    formatted_start = start_time
                    formatted_end = end_time

                # 发送微信通知
                if send_wechat_notification(title, city, formatted_start, formatted_end, address):
                    # 更新为已读
                    cursor.execute("UPDATE movie_records SET is_read=1 WHERE id=%s", (id,))
                    logger.info("已推送并标记为已读: %s", title)
        else:
            logger.info("没有发现未读记录")

        conn.commit()
        conn.close()
    except Exception as e:
        logger.error("检查和推送记录时出错: %s", e)

定时调度,当调度开始时先去获取b站活动数据,如果获取到新的活动数据则同步将数据存储到数据库中,否则查看数据库表中是否存在未推送的数据,如果存在则进行推送反之则不推送(这部分逻辑只在项目第一次运行时立即执行),分别为获取存储大会员活动信息与消息推送设置调度时间,这里我是设置的五分钟扫描一下库表,一个小时获取一次b站大会员数据,所以可能会存在一些问题(即当我收到公众号消息时这个活动已经开始一段时间了),其实也可以改,但是一个两个小时的信息差其实时可以接受的,毕竟大会员也不是人人都会开,人数时间比感觉也没什么太大问题。

def run_scheduled_tasks():
    """运行定时任务"""
    # 立即获取一次数据
    logger.info("开始首次获取数据...")
    data = fetch_bilibili_data()
    if data:
        parse_and_store_data(data)

    # 检查是否有未读记录需要推送
    check_and_push()

    # 设置定时任务
    # 每FETCH_INTERVAL_MINUTES分钟获取一次数据
    schedule.every(FETCH_INTERVAL_MINUTES).minutes.do(lambda: parse_and_store_data(fetch_bilibili_data()))

    # 每PUSH_INTERVAL_MINUTES分钟检查一次是否有新记录需要推送
    schedule.every(PUSH_INTERVAL_MINUTES).minutes.do(check_and_push)

    logger.info("定时任务已设置,开始运行...")

    # 持续运行定时任务
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("程序被用户中断")
    except Exception as e:
        logger.error("运行定时任务时出错: %s", e)

最后这里我还为这个程序添加了获取进程id的代码,但是感觉不是太好(只是在程序开头处打印),但是相对于一台机器上运行多个同名python程序还是有一定帮助的,在kill的时候会更加方便。

def get_pid():
    """获取当前进程id"""
    pid = os.getpid()
    logger.info("当前进程ID(PID): %s", pid)

这就是主程序的整体内容,其他的就是日志和脚本文件了,这里在使用宝塔自带的python应用部署工具的时候因为版本等原因(应用自带版本)无法部署,于是就直接简单粗暴的在服务器上部署并后台运行了

# 先确保设置编码(避免中文日志问题),再后台运行程序
export PYTHONIOENCODING=utf-8 && nohup python3 main.py > bilibili_notifier.log 2>&1 &

命令解释:

export PYTHONIOENCODING=utf-8 #设置 Python 输出编码为 UTF-8,解决中文日志问题
nohup #no hang up(不挂起),保证退出终端后程序继续运行
python3 main.py #运行你的程序
> bilibili_notifier.log #将程序标准输出重定向到指定日志文件(替代默认的 nohup.out)
2>&1 #将错误输出也重定向到同一个日志文件(方便排查问题)
& #让程序在后台运行

因为在日志文件中的开头会用这个进程的记录,所以可以使用kill直接终止这个程序。

# 先通过ps命令找到进程ID(PID),比如PID是12345
kill 12345

# 如果进程无法正常停止,强制杀死
kill -9 12345

# 项目的日志中会记录当前启动项目的pid,也可以直接从日志中获取pid信息

日志查看:

# 实时查看日志(类似tail -f)
tail -f bilibili_notifier.log

# 查看最新100行日志
tail -n 100 bilibili_notifier.log

# 查看全部日志
cat bilibili_notifier.log

在README中会有详细的使用教程,整个项目的完整度很高,实现起来不是很复杂(因为大多数代码都是AI提供的),对于现在做Agent应用生成来说很多人都会认为不需要自己会写代码,可能随便一个人都能做的这么一种感觉,个人感觉简单的静态网站是完全没有问题的,但是如果牵扯到后台接口调用,代码调试等问题,还是需要能够看懂代码才可以实现的至少在目前来说是这样的,不知道以后会是什么样的,也许有一天程序员真的会成为下一个消失的职业,也许会出现一个新的职业会代替程序员,在下一个风口不知道自己还能走多远。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

*

433 次浏览