video_blacklist/login.py

2301 lines
116 KiB
Python
Raw Normal View History

2025-06-03 18:23:22 +08:00
# -*- coding: utf-8 -*-
import urllib
import webview
# 假设 get_cookie 函数在这个文件中
# 确保 获取cookie.py 文件已更新,特别是清理 '|' 字符的部分
from 获取cookie import get_cookie # 导入 get_cookie 函数
import os
import re
import sqlite3
import sys
import json
import time
import pickle
import requests
from datetime import datetime
import logging
import random
import websocket
from urllib.parse import unquote
import gzip
from protobuf.douyin import *
import execjs
2026-03-26 08:38:46 +08:00
import execjs._external_runtime as _execjs_external_runtime
2025-06-03 18:23:22 +08:00
import hashlib
2026-03-26 08:38:46 +08:00
def _patch_execjs_popen_utf8():
"""
Windows execjs 通过 subprocess.Popen(..., universal_newlines=True) JS 源码时
默认会用系统区域编码 gbk编码字符串h_sign.js 若含泰文等字符会触发
UnicodeEncodeError execjs 已绑定的 Popen 做包装强制 UTF-8
"""
if getattr(_patch_execjs_popen_utf8, "_done", False):
return
_orig = _execjs_external_runtime.Popen
def _popen_utf8(*args, **kwargs):
if kwargs.get("encoding") is None and (
kwargs.get("universal_newlines") or kwargs.get("text")
):
kwargs = dict(kwargs)
kwargs["encoding"] = "utf-8"
kwargs.setdefault("errors", "replace")
return _orig(*args, **kwargs)
_execjs_external_runtime.Popen = _popen_utf8
_patch_execjs_popen_utf8._done = True
_patch_execjs_popen_utf8()
2025-06-03 18:23:22 +08:00
import urllib.parse
from threading import Thread
import urllib3
from typing import Dict, Optional
import threading
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
session = requests.session()
Settings_Options = {}
ID_TO_CONFIG_KEY = {
112: "douyin_dm",
122: "douyin_dm", # 112 和 122 都映射到同一个英文键 "douyin_dm"
2: "wechat_moments",
1: "wechat",
5: "qq",
4: "qq_zone",
3: "weibo",
-1: "link_image"
# 如果有其他ID也在这里添加映射
}
# 中文描述映射表:英文键 -> 中文名称(用于输出)
# 这个主要用于生成最终的用户友好信息
CONFIG_KEY_TO_CHINESE = {
"all": "全部", # "all" 选项的中文描述
"douyin_dm": "抖音私信",
"wechat_moments": "微信朋友圈",
"wechat": "微信",
"qq": "QQ",
"qq_zone": "QQ空间",
"weibo": "微博",
"link_image": "链接图片"
# 确保所有可能出现在 ID_TO_CONFIG_KEY 的值都在这里有中文映射
}
# 注意:这里不再需要 COOKIE_FILE = 'cookies.pkl',因为每个账号对应一个文件
# 定义从 cookie_name.txt 文件读取账号数据的函数
def read_accounts_from_file():
"""从 cookie_name.txt 逐行读取账号数据,返回字典列表。"""
accounts = []
# 检查文件是否存在,如果不存在则返回空列表
if not os.path.isfile('cookie_name.txt'):
print('read_accounts_from_file: 未找到 cookie_name.txt 文件。')
return accounts
try:
# 以文本模式打开文件Python 会自动处理不同换行符
with open('cookie_name.txt', 'r', encoding='utf-8') as f:
seen_urls = set() # 用于跟踪已添加的账号 URL 部分,避免重复添加
# 逐行读取文件
for line_num, raw_line in enumerate(f, 1): # 从 1 开始计数行号
stripped_line = raw_line.strip() # 移除行首尾的空白字符(包括各种换行符)
if not stripped_line: # 跳过空行
continue
# print(f"read_accounts_from_file: 处理第 {line_num} 行: '{stripped_line}'") # 调试输出每条有效行
parts = stripped_line.split('|')
if len(parts) == 3:
url_part, nickname, douyin_id = parts
# 如果 url_part 未见过,则添加
if url_part not in seen_urls:
accounts.append({
'url_part': url_part,
'nickname': nickname,
'douyin_id': douyin_id,
# 添加过期时间占位符 - 之后替换为实际逻辑
'expiry_time': '25年06月06日21时32分' # 硬编码的到期时间
})
seen_urls.add(url_part)
# print(f"read_accounts_from_file: 添加账号: url_part='{url_part}'") # 调试输出添加的账号
# else:
# print(f"read_accounts_from_file: 跳过重复账号 (url_part): '{url_part}'") # 调试输出跳过的重复账号
else:
print(
f"read_accounts_from_file: 跳过格式错误的第 {line_num} 行 (部分数量 != 3): '{stripped_line}'") # 调试输出格式错误的行
except Exception as e:
print(f"read_accounts_from_file: 读取或解析 cookie_name.txt 时出错: {e}")
# 错误时返回空列表
return []
print(f"read_accounts_from_file: 成功解析 {len(accounts)} 个账号。")
# print("read_accounts_from_file: 解析后的账号列表:", accounts) # 调试输出最终账号列表
return accounts
# 读取cookie pkl文件
def load_cookies_from_file(filename):
# 以二进制写入模式打开文件
with open(filename, 'rb') as f:
return pickle.load(f)
def get_roomId(url, Cookie):
print(datetime.now(), '正在请求直播房间')
headers = {
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
for _ in range(10):
res = session.get(url, headers=headers, cookies=Cookie)
# print(res.text)
roomId = re.findall(r'\\"roomId\\":\\"(\d+?)\\"', res.text) if re.findall(r'\\"roomId\\":\\"(\d+?)\\"',
res.text) else ''
if not roomId:
roomId = re.findall(r'\\"roomIdStr\\":\\"(\d+?)\\"', res.text) if re.findall(
r'\\"roomIdStr\\":\\"(\d+?)\\"',
res.text) else ''
if roomId:
return roomId
time.sleep(2)
def update_global_settings(new_settings_dict):
"""
Updates the global Settings_Options dictionary.
This function is intended to be called from main.py.
"""
global Settings_Options
Settings_Options = json.loads(new_settings_dict)
# print(type(Settings_Options))
# 拉黑
def lahei(sec_user_id, user_id, Cookie):
# 拉黑对方需要sec_user_id和user_id
# sec_user_idMS4wLjABAAAAhZ85ZW5BVlcjvTvpTlA6rcGIr_nhpzCSS42uSENLEqE主页后面
# user_idto_uid
for __ in range(10):
try:
url = "https://www-hj.douyin.com/aweme/v1/web/user/block/?device_platform=webapp&aid=6383&channel=channel_pc_web&pc_client_type=1&pc_libra_divert=Windows&update_version_code=170400&version_code=170400&version_name=17.4.0&cookie_enabled=true&screen_width=2560&screen_height=1440&browser_language=zh-CN&browser_platform=Win32&browser_name=Chrome&browser_version=131.0.0.0&browser_online=true&engine_name=Blink&engine_version=131.0.0.0"
# payload = "block_type=1&sec_user_id=MS4wLjABAAAAhZ85ZW5BVlcjvTvpTlA6rcGIr_nhpzCSS42uSENLEqE&source=0&user_id=61163251429"
payload = f"block_type=1&sec_user_id={sec_user_id}&source=0&user_id={user_id}"
headers = {
'Connection': 'keep-alive',
'Content-Length': '110',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'sec-ch-ua-mobile': '?0',
'Origin': 'https://www.douyin.com',
'Sec-Fetch-Site': 'same-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://www.douyin.com/',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': Cookie
# 'Cookie': 'ttwid=1%7CU9RBkam_C1udCzD15RdVkhD45xIsm7PIoQlTltJYCTk%7C1732686970%7Cdc4ddd594048baeaa3df7dcdc967e36006df40e0cc901d457382909db2394dc0; UIFID_TEMP=c4a29131752d59acb78af076c3dbdd52744118e38e80b4b96439ef1e20799db082f1380d5cd13efd2ef3c0bf18206e4bfbe0e3610d3ab52bee57c5fcfdc2e02c97ca6e46bdd3de6f984ea807009371b5db9506690172d0a9a5cc4ef43373f876cd4bd5a4eac0347fff8889821f134cf0; hevc_supported=true; strategyABtestKey=%221732686972.739%22; volume_info=%7B%22isUserMute%22%3Afalse%2C%22isMute%22%3Afalse%2C%22volume%22%3A0.5%7D; FORCE_LOGIN=%7B%22videoConsumedRemainSeconds%22%3A180%7D; passport_csrf_token=52363725d47dd1229c220f0470e2f2d7; passport_csrf_token_default=52363725d47dd1229c220f0470e2f2d7; bd_ticket_guard_client_web_domain=2; d_ticket=935e0100ebe543aeb19fb88474608a1615f45; passport_assist_user=CkF9ZXWVA-iZbKerIKHDaEp1xcJZ5lh4uYUgNu1ymCM1HYElnwC0ln3nqAxRA8ByxW37RMx2pswZ8MEQWnOx-MGStBpKCjyc330xG_wrxyc791bEObH0B7n-lL2ZJD6OxxfDqLpVB69azowaOsHM72XIrf2i8PVqW5FEu4LZMQRDvogQhsriDRiJr9ZUIAEiAQMmaH5v; n_mh=07vLiXVhkr6Gw426Bs6p1qWtmm_l_Xu5Jlf1_2HXC3I; sso_uid_tt=f65780cd0e55de027bf9299f9b7dec53; sso_uid_tt_ss=f65780cd0e55de027bf9299f9b7dec53; toutiao_sso_user=331fa717c329a4f4f115118fe1f6f684; toutiao_sso_user_ss=331fa717c329a4f4f115118fe1f6f684; sid_ucp_sso_v1=1.0.0-KDY0MjVkYmNhNjBmMDUwYTRjYjQ2MDY1ZGQyNDZmZTMwMGNiZjg3NWYKIQir86C94o2KBBCZ6Zq6BhjvMSAMMM31iY8GOAZA9AdIBhoCbGYiIDMzMWZhNzE3YzMyOWE0ZjRmMTE1MTE4ZmUxZjZmNjg0; ssid_ucp_sso_v1=1.0.0-KDY0MjVkYmNhNjBmMDUwYTRjYjQ2MDY1ZGQyNDZmZTMwMGNiZjg3NWYKIQir86C94o2KBBCZ6Zq6BhjvMSAMMM31iY8GOAZA9AdIBhoCbGYiIDMzMWZhNzE3YzMyOWE0ZjRmMTE1MTE4ZmUxZjZmNjg0; login_time=1732687003098; passport_auth_status=0d63f05f5920f5fa86371e2c328c60e7%2C; passport_auth_status_ss=0d63f05f5920f5fa86371e2c328c60e7%2C; uid_tt=fe90a08ede420f1008382babb4a01af2; uid_tt_ss=fe90a08ede420f1008382babb4a01af2; sid_tt=27847273045ab5d1055ce9cc29a4cf90; sessionid=27847273045ab5d1055ce9cc29a4cf90; sessionid_ss=27847273045ab5d1055ce9cc29a4cf90; is_staff_user=false; publish_badge_show_info=%220%2C0%2C0%2C1732687004013%22; SelfTabRedDotControl=%5B%5D; _bd_ticket_crypt_doamin=2; _bd_ticket_crypt_cookie=388b6415a7b774ecbb771fd4e6bb8f78; __security_server_data_status=1; sid_guard=27847273045ab5d1055ce9cc29a4cf90%7C1732687005%7C5183999%7CSun%2C+26-Jan-2025+05%3A56%3A44+GMT; sid_ucp_v1=1.0.0-KDk2M2NmYmU2NjkxZTQxYTUwOGM4NzgxMmNhZmQ0YjUxZWYwODBlZmMKGwir86C94o2KBBCd6Zq6BhjvMSAMOAZA9AdIBBoCaGwiIDI3ODQ3MjczMDQ1YWI1ZDEwNTVjZTljYzI5YTRjZjkw; ssid_ucp_v1=1.0.0-KDk2M2NmYmU2NjkxZTQxYTUwOGM4NzgxMmNhZmQ0YjUxZWYwODBlZmMKGwir86C94o2KBBCd6Zq6BhjvMSAMOAZA9AdIBBoCaGwiIDI3ODQ3MjczMDQ1YWI1ZDEwNTVjZTljYzI5YTRjZjkw; store-region=cn-fj; store-region-src=uid; biz_trace_id=41025dff; is_dash_user=1; SEARCH_RESULT_LIST_TYPE=%22single%22; pwa2=%220%7C0%7C3%7C0%22; my_rd=2; live_use_vvc=%22false%22; h265ErrorNum=-1; webcast_local_quality=origin; stream_player_status_params=%22%7B%5C%22is_auto_play%5C%22%3A0%2C%5C%22is_full_screen%5C%22%3A0%2C%5C%22is_full_webscreen%5C%22%3A0%2C%5C%22is_mute%5C%22%3A0%2C%5C%22is_speed%5C%22%3A1%2C%5C%22is_visible%5C%22%3A0%7D%22; __live_version__=%221.1.2.5721%22; live_can_add_dy_2_desktop=%221%22; stream_recommend_feed_params=%22%7B%5C%22cookie_enabled%5C%22%3Atrue%2C%5C%22screen_width%5C%22%3A2560%2C%5C%22screen_height%5C%22%3A1440%2C%5C%22browser_online%5C%22%3Atrue%2C%5C%22cpu_core_num%5C%22%3A28%2C%5C%22device_memory%5C%22%3A8%2C%5C%22downlink%5C%22%3A10%2C%5C%22effective_type%5C%22%3A%5C%224g%5C%22%2C%5C%22round_trip_time%5C%22%3A0%7D%22; FOLLOW_LIVE_POINT_INFO=%22MS4wLjABAAAAcfYfvFRPvoxRdGJHhNk26Vx0J0Nf9RmuUggT6CeeVxivnBYc1nRoJTwq_N2a6dJs%2F1732723200000%2F0%2F0%2F1732697858379%22; FOLLOW_NUMBER_YELLOW_POINT_INFO=%22MS4wLjABAAAAcfYfvFRPvoxRdGJHhNk26Vx0J0Nf9RmuUggT6CeeVxivnBYc1nRoJTwq_N2a6dJs%2F1732723200000%2F0%2F1732697258380%2F0%22; bd_ticket_guard_client_data=eyJiZC10aWNrZXQtZ3VhcmQtdmVyc2lvbiI6MiwiYmQtdGlja2V0LWd1YXJkLWl0ZXJhdGlvbi12ZXJzaW9uIjoxLCJiZC10aWNrZXQtZ3VhcmQtcmVlLXB1YmxpYy1rZXkiOiJCTmRrNUtCTjlJMWk4TW15V1RVMzdxcC9aSEI4WWU2My81M0VDdFF6Nk9pWk4wRVl0K2V4M1UydFZ1bmZ4dFM0V0NxYnVTd1U0NE9HeTdHdFp
}
response = requests.request("POST", url, headers=headers, data=payload, timeout=3, verify=False)
# '{"status_code":0,"block_status":1,"extra":{"now":1732697940000,"fatal_item_ids":[],"logid":"20241127165900C4E8A38FAAFFE50C3E07"},"log_pb":{"impr_id":"20241127165900C4E8A38FAAFFE50C3E07"}}'
if '"status_code":0' in response.text:
return True
else:
# print(response.text)
return False
except Exception as e:
print('lahei', e)
return False
# 解除拉黑
def jclahei(sec_user_id, Cookie, user_id='7454438944119719433'):
'''
:param sec_user_id:
:param user_id:抖音的user_unique_id值
:param Cookie:
:return:
'''
for __ in range(10):
try:
url = "https://www.douyin.com/aweme/v1/web/user/block/?device_platform=webapp&aid=6383&channel=channel_pc_web&pc_client_type=1&pc_libra_divert=Windows&update_version_code=170400&version_code=170400&version_name=17.4.0&cookie_enabled=true&screen_width=2560&screen_height=1440&browser_language=zh-CN&browser_platform=Win32&browser_name=Chrome&browser_version=131.0.0.0&browser_online=true&engine_name=Blink&engine_version=131.0.0.0&os_name=Windows&os_version=10&cpu_core_num=28&device_memory=8&platform=PC&downlink=10&effective_type=4g&round_trip_time=0"
payload = f"block_type=0&sec_user_id={sec_user_id}&source=0&user_id={user_id}"
headers = {
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'x-secsdk-csrf-token': 'fHFd=f24ms8IC7K0tv7Zr5yGw69vgstUyiXltzg3pxxtfaQOTRCbXlMU08taG5frPAnrKFo3FN6gXANTYVyn47qi=KY6IgaSiTYosAF939T',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Referer': f'https://www.douyin.com/user/{sec_user_id}',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
# 'Cookie': 'ttwid=1%7CKWXadMDhW9Ryw_ADwbeFK_FV9H-mbn3vFxyte2YAZL8%7C1735621839%7C2023a3817009a569d35894b430f6575ba57c352aa1c8d409409e8419fe2eeac2; UIFID_TEMP=c4a29131752d59acb78af076c3dbdd52744118e38e80b4b96439ef1e20799db02e8e7a5a86363a89fcec77e2d646615eca416b0564627981e02f280480cc7a0b5215a3f91585358e92dd2f7c4807c5002acaaeb100519902234feddc5b68b02bdfe0e29a074b2bf5645cc1afe4195533; s_v_web_id=verify_m5c0fj24_two4VH2Q_uopw_4bO1_9hhn_C0Tcq87kZcJx; hevc_supported=true; dy_swidth=2560; dy_sheight=1440; my_rd=2; fpk1=U2FsdGVkX19XIyl6qGLvr1yaDzNIntOXA6m8G57S4sifGptdOC7CTzX9RIcI5U2LTTFFHXrkPzUGHbjQ7CfRqw==; fpk2=f51bb482c660d0eeadd1f058058a2b35; bd_ticket_guard_client_web_domain=2; passport_csrf_token=8521596f974a978da1e4b354eb1d35c1; passport_csrf_token_default=8521596f974a978da1e4b354eb1d35c1; UIFID=c4a29131752d59acb78af076c3dbdd52744118e38e80b4b96439ef1e20799db02e8e7a5a86363a89fcec77e2d646615eca416b0564627981e02f280480cc7a0bed8fe6dddf462035cabdbffe88852cd4b880a1ac63ce66c05dd898d871d0d9a56122b3387b3aa1bca9b18d5e63727b923ccb09fb874d403178c3c7e258b20a2d2fd65f7e5e111c28ea1fa9ccfbeca9d86001470e9b5c68ef98f37b8469b938d87d0877f83a459825f9347573727217b7b20b2bedc1b0bbef9547713cd654f778; FORCE_LOGIN=%7B%22videoConsumedRemainSeconds%22%3A180%2C%22isForcePopClose%22%3A1%7D; is_dash_user=1; __ac_nonce=06775f84a00347ad53f19; __ac_signature=_02B4Z6wo00f01zRL7KAAAIDDA28o.PttSTc0a-gAAKpx60; csrf_session_id=6432c96e165f150b78c22e8447711030; passport_assist_user=Cjzgvg6j8zBZnJqRpkCuCFVCSD6GgohNh0pytKoNp5NXG8lLVmaH3x7_XmHVcBTSbN2B6zNCOQI6-gL6geAaSgo85CasOHZYo_u95TNfT5dF1YykMfnegAC-GyhVlQFvWalAh_Fm3XQuAdpHKHIKdDy5ZQZ441_9kQQZZWeXEPrc5Q0Yia_WVCABIgED7P5X0Q%3D%3D; n_mh=9-mIeuD4wZnlYrrOvfzG3MuT6aQmCUtmr8FxV8Kl8xY; sso_uid_tt=183c57659c43eeceaed318796ff7f102; sso_uid_tt_ss=183c57659c43eeceaed318796ff7f102; toutiao_sso_user=ca675dcc5f006efc313dc69f7e3e0ccb; toutiao_sso_user_ss=ca675dcc5f006efc313dc69f7e3e0ccb; sid_ucp_sso_v1=1.0.0-KGY3MDY2NWZlZjFlNTk2NDRjOTc4ZTc2YzdhOGIxNmY4YjFlYjEzMDYKHQicjOfinAIQ3vDXuwYY7zEgDDC7psTQBTgGQPQHGgJscSIgY2E2NzVkY2M1ZjAwNmVmYzMxM2RjNjlmN2UzZTBjY2I; ssid_ucp_sso_v1=1.0.0-KGY3MDY2NWZlZjFlNTk2NDRjOTc4ZTc2YzdhOGIxNmY4YjFlYjEzMDYKHQicjOfinAIQ3vDXuwYY7zEgDDC7psTQBTgGQPQHGgJscSIgY2E2NzVkY2M1ZjAwNmVmYzMxM2RjNjlmN2UzZTBjY2I; login_time=1735784542133; passport_auth_status=577638629195a1c78981dcc18eb6914d%2C; passport_auth_status_ss=577638629195a1c78981dcc18eb6914d%2C; uid_tt=be33504736d799b7e42458a52241a944; uid_tt_ss=be33504736d799b7e42458a52241a944; sid_tt=3a7970ea751f676dce010b635f8d19b1; sessionid=3a7970ea751f676dce010b635f8d19b1; sessionid_ss=3a7970ea751f676dce010b635f8d19b1; is_staff_user=false; _bd_ticket_crypt_doamin=2; _bd_ticket_crypt_cookie=c2194fa0fb2ed4598e93bbead8b2ec09; __security_server_data_status=1; publish_badge_show_info=%220%2C0%2C0%2C1735784550346%22; passport_fe_beating_status=true; sid_guard=3a7970ea751f676dce010b635f8d19b1%7C1735784552%7C5183993%7CMon%2C+03-Mar-2025+02%3A22%3A25+GMT; sid_ucp_v1=1.0.0-KGUyNWE1YzdkYjc1Zjg3ODMzZDQ4NTFjYjU4ODhlN2Y1MjI2ODkxZWEKGQicjOfinAIQ6PDXuwYY7zEgDDgGQPQHSAQaAmxxIiAzYTc5NzBlYTc1MWY2NzZkY2UwMTBiNjM1ZjhkMTliMQ; ssid_ucp_v1=1.0.0-KGUyNWE1YzdkYjc1Zjg3ODMzZDQ4NTFjYjU4ODhlN2Y1MjI2ODkxZWEKGQicjOfinAIQ6PDXuwYY7zEgDDgGQPQHSAQaAmxxIiAzYTc5NzBlYTc1MWY2NzZkY2UwMTBiNjM1ZjhkMTliMQ; FOLLOW_NUMBER_YELLOW_POINT_INFO=%22MS4wLjABAAAAHZbJJ5e8g4Lk0PIezttIs-bJdhhLb79nOZeFZVjop6o%2F1735833600000%2F1735784585664%2F1735784545513%2F0%22; SelfTabRedDotControl=%5B%5D; volume_info=%7B%22isUserMute%22%3Afalse%2C%22isMute%22%3Afalse%2C%22volume%22%3A0.5%7D; strategyABtestKey=%221735784593.663%22; biz_trace_id=18f02cfb; store-region=cn-fj; store-region-src=uid; stream_recommend_feed_params=%22%7B%5C%22cookie_enabled%5C%22%3Atrue%2C%5C%22screen_width%5C%22%3A2560%2C%5C%22screen_height%5C%22%3A1440%2C%5C%22browser_online%5C%22%3Atrue%2C%5C%22cpu_core_num%5C%22%3A28%2C%5C%22device_memory%5C%22%3A8%2C%5C%22downlink%5C%22%3A10%2C%5C%22effective_type%5C%22%3A%5C%224g%5C%22%2C%5C%22round_trip_time%5C%22%3A0%7D%22; download_guide=%223%2F20250102%2F0%22; IsDouyinActive=true; home_can_add_dy_2_deskto
}
response = requests.request("POST", url, headers=headers, data=payload, timeout=3, verify=False,
cookies=Cookie)
if response.status_code == 200:
if '"status_code":0' in response.text:
return True
else:
# print(response.text)
return response.text
else:
return response.status_code
except Exception as e:
print('jclahei', e)
return False
# 读取cookie
def load_cookies_from_file(filename):
# 以二进制写入模式打开文件
with open(filename, "rb") as f:
return pickle.load(f)
# 取随机ua
def zhengchang_ua(i):
# return 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36 Edg/106.0.1370.42'
if i == 0:
first_num = random.randint(55, 62)
third_num = random.randint(0, 3200)
fourth_num = random.randint(0, 140)
os_type = [
"(Linux; Android 10; MI 9 Build/QKQ1.190825.002; wv)",
"(Linux; Android 8.1.0; PBCM30 Build/OPM1.171019.011; wv)",
"(Linux; Android 6.0; HUAWEI MLA-AL10 Build/HUAWEIMLA-AL10; wv)",
"(Linux; Android 6.0.1; OPPO A57t Build/MMB29M; wv)",
"(iPhone; CPU iPhone OS 10_3_3 like Mac OS X)",
"(Linux; U; Android 9; zh-cn; Redmi 7 Build/PKQ1.181021.001)",
"(Linux; Android 9; V1914A Build/PKQ1.181016.001; wv)",
"(Linux; U; Android 10; zh-CN; ALP-AL00 Build/HUAWEIALP-AL00)",
"(Linux; Android 10; VOG-TL00 Build/HUAWEIVOG-TL00; wv)",
"(Linux; Android 9; Nokia X7 Build/PPR1.180610.011; wv)",
"(Linux; Android 9.0; S5 Build/PKQ1.190319.001; wv)",
"(Linux; Android 10; PCT-AL10 Build/HUAWEIPCT-AL10; wv)",
"(Linux; Android 10; PCT-AL10 Build/HUAWEIPCT-AL10; wv)",
"(Linux; Android 9; Mi9 Pro 5G Build/PKQ1.190714.001; wv)",
"(iPhone; CPU iPhone OS 13_4 like Mac OS X)",
"(Linux; Android 10; OXF-AN00 Build/HUAWEIOXF-AN00; wv)",
"(Linux; U; Android 9; zh-cn; V1814A Build/PKQ1.180819.001)",
"(Linux; U; Android 7.0; zh-cn; Redmi Note 4X Build/NRD90M)",
"(Linux; Android 9; RMX1991 Build/PKQ1.190630.001; wv)",
"(Linux; Android 10; JNY-AL10 Build/HUAWEIJNY-AL10; wv)",
"(Linux; Android 10; ELE-AL00 Build/HUAWEIELE-AL00; wv)",
"(Linux; Android 9; RMX1991 Build/PKQ1.190630.001; wv)",
"(Linux; U; Android 10; zh-cn; MI 8 Build/QKQ1.190828.002)",
"(Linux; Android 10; VCE-AL00 Build/HUAWEIVCE-AL00; wv)",
"(Linux; Android 8.0.0; AUM-TL20 Build/HONORAUM-TL20; wv)",
"(Linux; Android 10; ALP-AL00 Build/HUAWEIALP-AL00; wv)",
"(Linux; U; Android 10; zh-cn; MI 8 SE Build/QKQ1.190828.002)",
"(Linux; Android 7.1.1; OPPO A83 Build/N6F26Q; wv)",
"(Linux; Android 10; VOG-AL10 Build/HUAWEIVOG-AL10; wv)",
"(iPhone; CPU iPhone OS 13_3_1 like Mac OS X)",
"(Linux; U; Android 10; zh-cn; MI 9 Build/QKQ1.190825.002)",
"(Linux; Android 9; INE-TL00 Build/HUAWEIINE-TL00; wv)",
"(Linux; Android 9; ONEPLUS A6000 Build/PKQ1.180716.001; wv)",
"(Linux; U; Android 9; zh-cn; Mi9 Pro 5G Build/PKQ1.190714.001)",
"(Linux; Android 8.1.0; OPPO R11 Plus Build/OPM1.171019.011; wv)",
"(Linux; Android 8.1.0; COR-AL00 Build/HUAWEICOR-AL00; wv)",
"(Linux; Android 7.1.2; M15 Build/N2G47H; wv)",
"(Linux; arm_64; Android 7.0; LG-H870DS)",
"(Linux; U; Android 9; zh-cn; MI CC9 Pro Build/PKQ1.190302.001)",
"(Linux; Android 10; SM-G9730 Build/QP1A.190711.020; wv)",
"(Linux; Android 7.1.2; Hisense A2T Build/N2G47H; wv)",
"(Linux; U; Android 8.1.0; zh-CN; 16th Plus Build/OPM1.171019.026)",
"(Linux; U; Android 9; zh-cn; MI 8 Lite Build/PKQ1.181007.001)",
"(Linux; Android 9; PAAM00 Build/PKQ1.190414.001; wv)",
"(iPhone; CPU iPhone OS 10_1_1 like Mac OS X)",
"(iPhone; CPU iPhone OS 11_4_1 like Mac OS X)",
"(Linux; Android 9; V1914A Build/PKQ1.181016.001; wv)",
"(Linux; Android 10; PCT-AL10 Build/HUAWEIPCT-AL10; wv)",
]
chrome_version = "Chrome/{}.0.{}.{}".format(first_num, third_num, fourth_num)
ua = " ".join(
[
"Mozilla/5.0",
random.choice(os_type),
"AppleWebKit/537.36",
"(KHTML, like Gecko)",
chrome_version,
"Safari/537.36",
]
)
return ua
elif i == 1:
os_type = [
"(Windows NT 6.1; WOW64)",
"(Windows NT 6.0; WOW64)",
"(Windows NT 6.2; WOW64)",
"(Windows NT 6.3; WOW64)",
"(Windows NT 5.1; WOW64)",
"(Windows NT 5.0; WOW64)",
"(Windows NT 5.2; WOW64)",
"(Windows NT 5.3; WOW64)",
"(Windows NT 10.1; WOW64)",
"(Windows NT 10.0; WOW64)",
"(Windows NT 10.2; WOW64)",
"(Windows NT 10.3; WOW64)",
"(Windows NT 6.1; WOW64; X64)",
"(Windows NT 6.0; WOW64; X64)",
"(Windows NT 6.2; WOW64; X64)",
"(Windows NT 6.3; WOW64; X64)",
"(Windows NT 5.1; WOW64; X64)",
"(Windows NT 5.0; WOW64; X64)",
"(Windows NT 5.2; WOW64; X64)",
"(Windows NT 5.3; WOW64; X64)",
"(Windows NT 10.1; WOW64; X64)",
"(Windows NT 10.0; WOW64; X64)",
"(Windows NT 10.2; WOW64; X64)",
"(Windows NT 10.3; WOW64; X64)",
"(Windows NT 6.1)",
"(Windows NT 6.0)",
"(Windows NT 6.2)",
"(Windows NT 6.3)",
"(Windows NT 5.1)",
"(Windows NT 5.0)",
"(Windows NT 5.2)",
"(Windows NT 5.3)",
"(Windows NT 10.1)",
"(Windows NT 10.0)",
"(Windows NT 10.2)",
"(Windows NT 10.3)",
]
banben = (
str(random.randint(47, 170))
+ ".0."
+ str(random.randint(2000, 2755))
+ "."
+ str(random.randint(0, 25))
)
ua = " ".join(
[
"Mozilla/5.0",
random.choice(os_type),
"AppleWebKit/537.36",
"(KHTML, like Gecko)",
"Chrome/" + banben,
"Safari/537.36",
]
)
return ua
elif i == 2:
os_type = [
"(Macintosh; U; PPC Mac OS X 10_5_8; ja-jp)",
"(Windows; U; Windows NT 6.1; cs-CZ)",
"(Windows; U; Windows NT 6.0; tr-TR)",
"(Windows; U; Windows NT 5.1; ru-RU)",
"(Windows; U; Windows NT 6.0; tr-TR)",
"(Macintosh; Intel Mac OS X 10_6_8)",
"(Windows; U; Windows NT 6.1; zh-HK)",
"(Windows; U; Windows NT 6.0; ja-JP)",
"(Windows; U; Windows NT 6.0; hu-HU)",
"(Macintosh; U; Intel Mac OS X 10_6_6; de-de)",
"(Windows; U; Windows NT 6.1; ko-KR)",
"(Macintosh; Intel Mac OS X 10_7_3)",
"(Macintosh; U; PPC Mac OS X 10_5_8; ja-jp)",
"(Windows; U; Windows NT 6.1; sv-SE)",
"(Macintosh; U; Intel Mac OS X 10_6_6; fr-ch)",
"(Windows; U; Windows NT 6.1; de-DE)",
"(Windows; U; Windows NT 6.0; de-DE)",
"(Macintosh; U; Intel Mac OS X 10_5_8; zh-cn)",
"(Macintosh; U; Intel Mac OS X 10_6_6; ja-jp)",
"(Windows; U; Windows NT 6.0; en-US)",
]
banben = (
str(random.randint(533, 534))
+ "."
+ str(random.randint(18, 21))
+ "."
+ str(random.randint(1, 27))
)
chrome_version = [
"Version/5.0.3",
"Version/5.0.4",
"Version/5.0.2",
"Version/7.0.3",
"Version/6.0 Mobile/10A5355d",
"Version/5.1.7",
"Version/5.1.6",
"Version/5.1.3",
"Version/5.1.4",
"Version/5.1.2",
"Version/5.1.3",
]
ua = " ".join(
[
"Mozilla/5.0",
random.choice(os_type),
"AppleWebKit/" + banben,
"(KHTML, like Gecko)",
random.choice(chrome_version),
"Safari/" + banben,
]
)
return ua
elif i == 3:
os_type = [
"(Windows NT 10.0; Win64; x64; rv:",
"(Windows NT 6.3; Win64; x64; rv:",
"(Windows NT 6.2; Win64; x64; rv:",
"(Windows NT 10; rv:",
"(Windows NT 6.3; rv:",
"(Windows NT 6.2; rv:",
"(Windows NT 6.1; rv:",
"(Windows NT 6.1; Win64; x64; rv:",
"(X11; Linux i586; rv:",
"(Windows NT 10.0; Win64; x64; rv:",
]
banben = str(random.randint(20, 50)) + ".0"
chrome_version = [
"Gecko/20100101 Firefox/",
"Gecko/20130405 Firefox/",
"Gecko/20130330 Firefox/",
"Gecko/20130430 Firefox/",
"Gecko/20140230 Firefox/",
"Gecko/20140630 Firefox/",
"Gecko/20140512 Firefox/",
"Gecko/20140712 Firefox/",
"Gecko/20100502 Firefox/",
]
ua = " ".join(
[
"Mozilla/5.0",
random.choice(os_type) + banben + ")",
random.choice(chrome_version) + banben,
]
)
return ua
elif i == 4:
os_type = [
"(Macintosh; U; Intel Mac OS X 10_6_7; ",
"(Macintosh; U; Intel Mac OS X 10_6_5; ",
"(Windows; U; Windows NT 6.1; ",
"(Macintosh; U; Intel Mac OS X 10_6_6; ",
"(Windows; U; Windows NT 6.0; ",
"(Windows; U; Windows NT 6.1; ",
"(Windows; U; Windows NT 5.1; ",
"(Macintosh; U; Intel Mac OS X 10_6_6; ",
"(Macintosh; U; PPC Mac OS X 10_5_8;",
"(Macintosh; U; Intel Mac OS X 10_6_7;",
"(Macintosh; U; Intel Mac OS X 10_6_5;",
"(Macintosh; U; Intel Mac OS X 10_6_6;",
]
diqu = [
"ja-jp)",
"de-de)",
"de-DE)",
"fr-ch)",
"en-US)",
"cs-CZ)",
"it-it)",
"ko-kr)",
"da-dk)",
"ar)",
"hu-HU)",
"ko-KR)",
"es-es)",
"zh-HK)",
"en-gb)",
]
banben = (
str(random.randint(532, 535))
+ "."
+ str(random.randint(19, 55))
+ "."
+ str(random.randint(1, 25))
)
ua = " ".join(
[
"Mozilla/5.0",
random.choice(os_type) + random.choice(diqu),
"AppleWebKit/" + banben,
"(KHTML, like Gecko)",
"Version/5.0." + str(random.randint(1, 5)),
"Safari/" + banben,
]
)
return ua
elif i == 5:
os_type = [
"(Windows; U; Windows NT 5.1",
"(Macintosh; U; Intel Mac OS X 10_6_6",
"(Macintosh; U; PPC Mac OS X 10_5_8",
"(Macintosh; U; Intel Mac OS X 10_6_7",
"(Macintosh; U; Intel Mac OS X 10_6_5",
"(Macintosh; U; Intel Mac OS X 10_6_6",
"(Macintosh; AMD Mac OS X 10_8_2",
"(Macintosh; Intel Mac OS X 10_10_1",
"(Macintosh; Intel Mac OS X 10_5_8",
"(Macintosh; Intel Mac OS X 10_6_0",
"(Macintosh; Intel Mac OS X 10_6_2",
"(Macintosh; Intel Mac OS X 10_6_3",
"(Macintosh; Intel Mac OS X 10_6_4",
"(Macintosh; Intel Mac OS X 10_6_5",
"(Macintosh; Intel Mac OS X 10_6_6",
"(Macintosh; Intel Mac OS X 10_6_7",
"(Macintosh; Intel Mac OS X 10_6_8",
"(Macintosh; Intel Mac OS X 10_7_0",
"(Macintosh; Intel Mac OS X 10_7_1",
"(Macintosh; Intel Mac OS X 10_7_2",
"(Macintosh; Intel Mac OS X 10_7_3",
"(Macintosh; Intel Mac OS X 10_7_4",
"(Macintosh; Intel Mac OS X 10_7_5",
"(Macintosh; Intel Mac OS X 10_8_0",
"(Macintosh; Intel Mac OS X 10_8_1",
"(Macintosh; Intel Mac OS X 10_8_2",
"(Macintosh; Intel Mac OS X 10_8_3",
"(Macintosh; Intel Mac OS X 10_9_0",
"(Macintosh; Intel Mac OS X 10_9_1",
"(Macintosh; Intel Mac OS X 10_9_2",
"(Macintosh; Intel Mac OS X 10_9_3",
"(Macintosh; PPC Mac OS X 10_6_7",
"(Macintosh; U; Intel Mac OS X 10_5_6",
"(Macintosh; U; Intel Mac OS X 10_5_7",
"(Macintosh; U; Intel Mac OS X 10_5_8",
"(Macintosh; U; Intel Mac OS X 10_5_9",
"(Macintosh; U; Intel Mac OS X 10_6_0",
"(Macintosh; U; Intel Mac OS X 10_6_1",
"(Macintosh; U; Intel Mac OS X 10_6_2",
"(Macintosh; U; Intel Mac OS X 10_6_3",
"(Macintosh; U; Intel Mac OS X 10_6_4",
"(Macintosh; U; Intel Mac OS X 10_6_5",
"(Macintosh; U; Intel Mac OS X 10_6_6",
"(Macintosh; U; Intel Mac OS X 10_6_7",
"(Macintosh; U; Intel Mac OS X 10_6_8",
"(Macintosh; U; Intel Mac OS X 10_8",
]
diqu = [
"; ja-jp)",
"; e-de)",
"; de-DE)",
"; fr-ch)",
"; en-US)",
"; cs-CZ)",
"; it-it)",
"; ko-kr)",
"; da-dk)",
"; ar)",
"; hu-HU)",
"; ko-KR)",
"; es-es)",
"; zh-HK)",
"; en-gb)",
"; id-ID)",
]
banben = (
str(random.randint(500, 535))
+ "."
+ str(random.randint(0, 55))
+ "."
+ str(random.randint(0, 25))
)
ua = " ".join(
[
"Mozilla/5.0",
random.choice(os_type) + random.choice(diqu),
"AppleWebKit/" + banben,
"(KHTML, like Gecko)",
"Version/5.0." + str(random.randint(1, 9)),
"Safari/" + banben,
]
)
return ua
else:
os_type = [
"(iPhone; U; CPU iPhone OS 4_3_1 like Mac OS X",
"(iPhone; U; CPU iPhone OS 4_2_1 like Mac OS X",
"(iPhone; U; CPU iPhone OS 4_3 like Mac OS X",
"(iPhone; U; CPU iPhone OS 4_2_4 like Mac OS X",
"(iPhone; CPU iPhone OS 12_1_4 like Mac OS X",
"(iPhone; CPU iPhone OS 12_0 like Mac OS X",
"(iPhone; CPU iPhone OS 11_2 like Mac OS X",
"(iPhone 6s; CPU iPhone OS 11_4_1 like Mac OS X",
"(iPhone; CPU iPhone OS 10_1 like Mac OS X",
"(iPhone; CPU iPhone OS 11_0_2 like Mac OS X",
"(iPhone; CPU iPhone OS 11_4_1 like Mac OS X",
"(iPhone; CPU iPhone OS 10_1_1 like Mac OS X",
"(iPhone; CPU iPhone OS 13_5_1 like Mac OS X",
"(iPhone; CPU iPhone OS 8_3 like Mac OS X",
"(iPhone; CPU iPhone OS 14_4 like Mac OS X",
"(iPhone; CPU iPhone OS 13_6 like Mac OS X",
"(iPhone; CPU iPhone OS 14_1 like Mac OS X",
"(iPhone; CPU iPhone OS 13_6_1 like Mac OS X",
]
diqu = [
"; ja-jp)",
"; e-de)",
"; de-DE)",
"; fr-ch)",
"; en-US)",
"; cs-CZ)",
"; it-it)",
"; ko-kr)",
"; da-dk)",
"; ar)",
"; hu-HU)",
"; ko-KR)",
"; es-es)",
"; zh-HK)",
"; en-gb)",
"; id-ID)",
]
banben = (
str(random.randint(500, 535))
+ "."
+ str(random.randint(0, 55))
+ "."
+ str(random.randint(0, 25))
)
mobi = [
"Version/6.0 Mobile/10A5355d",
"Version/5.1 Mobile/9B176",
"Version/5.0.2 Mobile/8J2",
"Version/5.0.2 Mobile/8G4",
"Version/5.0.2 Mobile/8C148",
"Version/5.0.2 Mobile/8C148a",
"Version/5.0.2 Mobile/8G4",
"Version/5.0.2 Mobile/8F190",
"Version/4.0.5 Mobile/8B5097d",
"Version/4.0.5 Mobile/8B117",
"Version/4.0.4 Mobile/7B314",
"Version/4.0.4 Mobile/7B334b",
"Version/4.0.4 Mobile/7D11",
"Version/4.0.4 Mobile/7B500",
"Version/4.0.4 Mobile/7B360",
"Version/3.1.1 Mobile/5G77",
]
ua = " ".join(
[
"Mozilla/5.0",
random.choice(os_type) + random.choice(diqu),
"AppleWebKit/" + banben,
"(KHTML, like Gecko)",
random.choice(mobi),
"Safari/" + banben,
]
)
return ua
# wss链接加密 获取 signature
def generateSignature(wss):
"""
2026-03-26 08:38:46 +08:00
若出现 gbk 无法编码 JS 源码已在文件顶部对 execjs._external_runtime.Popen UTF-8 包装
无需再改标准库 subprocess
2025-06-03 18:23:22 +08:00
"""
params = (
"live_id,aid,version_code,webcast_sdk_version,"
"room_id,sub_room_id,sub_channel_id,did_rule,"
"user_unique_id,device_platform,device_type,ac,"
"identity"
).split(",")
wss_params = urllib.parse.urlparse(wss).query.split("&")
wss_maps = {i.split("=")[0]: i.split("=")[-1] for i in wss_params}
tpl_params = [f"{i}={wss_maps.get(i, '')}" for i in params]
param = ",".join(tpl_params)
md5 = hashlib.md5()
md5.update(param.encode())
md5_param = md5.hexdigest()
# with codecs.open('D:\job_demo\demo\day04\h_sign.js', 'r', encoding='utf8') as f:
# script = f.read()
# ctx = MiniRacer()
# ctx.eval(script)
is_pack = getattr(sys, 'frozen', False)
if is_pack:
run_path = sys._MEIPASS
else:
run_path = os.getcwd()
js_file_path = os.path.join(run_path, 'h_sign.js')
with open(js_file_path, "r", encoding="utf-8") as f:
js_code = f.read()
ctx = execjs.compile(js_code)
try:
signature = ctx.call("get_sign", md5_param)
return signature
except Exception as e:
print(e)
# 检索关键词拉黑
def char_in_list(string, char_list):
"""
判断字符串中的某个字符是否在列表元素中
:param char: 要检查的字符
:param char_list: 字符列表
:return: 如果字符在列表中则返回True否则返回False
"""
for char in char_list.split('\n'):
if char == '':
continue
if char in string:
if char:
return char
else:
return True
return False
# 查看用户主页信息
def View_user_home(uid, Cookies):
# 访问主页,获取用户信息,只有登录才能获取地区
for __ in range(10):
try:
# session = requests.Session()
# session.mount('https://www.douyin.com', DESAdapter())
url = "https://www.douyin.com/user/" + uid
headers = {
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'User-Agent': zhengchang_ua(1),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'cookie': Cookies
}
data = {'age': None, 'awemeCount': None, 'desc': None, 'enterpriseVerifyReason': None, 'ip': None,
'mplatformFollowersCount': None, "followingCount": None}
response = requests.get(url=url, headers=headers, timeout=1, verify=False)
# print(response.text)
rtext = response.text
if '>私密账号</p>' in rtext:
data['private'] = True
push = re.findall(f'(?:freq_visit_msg|statusMsg).*secUid\\\\":\\\\"{uid}.*?redirectFrom', rtext)
if push:
ipLocation = re.findall('"ipLocation\\\\":\\\\"(.*?)\\\\"', push[0]) # IP归属地
if ipLocation:
# 未知是'$undefined'
if '$undefined' in ipLocation[0]:
data['ip'] = '未知'
else:
data['ipLocation'] = ipLocation[0].replace('IP属地', '')
country = re.findall('"country\\\\":\\\\"(.*?)\\\\"', push[0]) # 城市
province = re.findall('"province\\\\":\\\\"(.*?)\\\\"', push[0]) # 城市
city = re.findall('"city\\\\":\\\\"(.*?)\\\\"', push[0]) # 城市
# 未知是'$undefined'
if '$undefined' in country or not country:
data['country'] = '未知'
elif '$undefined' in province or not province:
data['country'] = country[0]
elif '$undefined' in city or not city:
data['country'] = country[0]
else:
data['country'] = city[0]
age = re.findall('"age\\\\":(.*?),', push[0]) # 年龄
if age:
# 未知是['null']
if 'null' in age[0] or '-1' in age[0]:
data['age'] = '未知'
else:
data['age'] = age[0]
awemeCount = re.findall('"awemeCount\\\\":(.*?),', push[0]) # 作品数量
if awemeCount:
if awemeCount[0] != '':
data['awemeCount'] = awemeCount[0]
# followingCount = re.findall('"followingCount\\\\":(.*?),', push[0]) # 关注数量
# if followingCount:
# if followingCount[0] != '':
# data['followingCount'] = followingCount[0]
mplatformFollowersCount = re.findall('"mplatformFollowersCount\\\\":(.*?),', push[0]) # 粉丝数量
if mplatformFollowersCount:
if mplatformFollowersCount[0] != '':
data['mplatformFollowersCount'] = mplatformFollowersCount[0]
desc = re.findall('"desc\\\\":\\\\"(.*?)\\\\"', push[0]) # 简介
if desc:
if desc[0] != '':
data['desc'] = desc[0]
enterpriseVerifyReason = re.findall('"enterpriseVerifyReason\\\\":\\\\"(.*?)\\\\"', push[0]) # 有内容就是蓝V
if enterpriseVerifyReason:
if enterpriseVerifyReason[0] != '':
data['enterpriseVerifyReason'] = True
return data
except Exception as e:
pass
# print('fwzy',e)
return False
# 年龄区间
def check_blacklist_age(age_inputValue, current_age):
"""检查当前年龄是否在拉黑的年龄段内"""
age_ranges = age_inputValue
if not age_ranges:
return False
# 处理特殊值 "未知"
if current_age == '未知':
return '-1' in age_ranges.split(',')
try:
current_age_num = int(current_age)
except ValueError:
# 非数字且非"未知"的年龄直接返回False
print(f"警告: 无法将年龄 '{current_age}' 转换为数字。") # 可选:记录警告
return False
# 解析年龄段字符串
for range_str in age_ranges.split(','):
range_str = range_str.strip() # 去除可能的空白字符
if not range_str:
continue # 跳过空的字符串(例如连续逗号 , ,
if range_str == '-1':
continue # 已在前面处理过-1
try:
if '-' in range_str:
# 处理年龄段
start_str, end_str = range_str.split('-', 1) # 限制分割次数
start = int(start_str)
end = int(end_str)
# 检查范围是否合法,虽然不是必须,但更严谨
if start > end:
print(f"警告: 无效的年龄范围 '{range_str}' (开始大于结束)。") # 可选:记录警告
continue # 跳过无效范围
if start <= current_age_num <= end:
return True
else:
# 处理单独的年龄值
single_age = int(range_str)
if current_age_num == single_age:
return True
except ValueError:
# 处理无法转换为数字的范围或值
print(f"警告: 无法解析拉黑年龄配置项 '{range_str}'") # 可选:记录警告
continue # 跳过无法解析的项
return False
# 性别拉黑
def Gender_blocking(gender, gender_options):
# 将中文性别转换为英文选项
gender_mapping = {
'': 'male',
'': 'female',
'未知': 'unknown'
}
# 获取对应的英文选项
gender_english = gender_mapping.get(gender, 'unknown')
# 检查是否需要拉黑
if 'all' in gender_options:
# print(f"用户性别为 {gender},被拉黑(全部性别拉黑)")
return f'性别为:{gender}(全部性别拉黑)'
elif gender_english in gender_options:
# print(f"用户性别为 {gender},被拉黑")
return f'性别为:{gender}'
# return True
# print(f"用户性别为 {gender},未被拉黑")
return False
# 分享类型拉黑
def check_share_block(share_target_id, selected_option_keys):
"""
检查一个分享目标ID是否应该被拉黑基于给定的选项列表
Args:
share_target_id: 分享目标的数值ID
selected_option_keys: 配置中选中的分享类型英文键列表 (e.g., ["wechat_moments", "douyin_dm"])
Returns:
如果应该拉黑返回描述拉黑原因的字符串否则返回 False
"""
# 1. 将配置选项转换为集合,提高查找效率
selected_option_set = set(selected_option_keys)
# 2. 检查是否配置了 "all" 选项
if "all" in selected_option_set:
# 如果配置了 "all",则无论具体是什么分享类型,都视为拉黑
# 获取目标ID对应的中文名称用于输出信息
target_config_key = ID_TO_CONFIG_KEY.get(share_target_id)
target_chinese_name = CONFIG_KEY_TO_CHINESE.get(target_config_key, "未知")
return f'分享类型为:{target_chinese_name}(全部类型拉黑)'
# 3. 如果没有配置 "all"查找目标ID对应的配置键
target_config_key = ID_TO_CONFIG_KEY.get(share_target_id)
# 4. 检查目标ID是否已知并且其对应的配置键是否在选中的选项集合中
if target_config_key is not None and target_config_key in selected_option_set:
# 如果目标类型已知且在配置列表中,则视为拉黑
target_chinese_name = CONFIG_KEY_TO_CHINESE.get(
target_config_key) # 此时 target_config_key 肯定在 CONFIG_KEY_TO_CHINESE 中
return f'分享类型为:{target_chinese_name}(拉黑)'
# 5. 如果以上条件都不满足,则不拉黑
return False
# --- 新增:将拉黑记录写入数据库的函数 ---
def insert_blacklist_record(account_url_part, user_sid, user_douyin_id, user_nickname, user_avatar, blacklist_reason):
"""
将一条拉黑记录插入到 blacklist_records.db 数据库
参数:
account_url_part (str): 拉黑该记录的主播账号的 url_part
user_sid (str): 被拉黑用户的抖音主页ID (user_sid)
user_douyin_id (str): 被拉黑用户的抖音号 (display_id)
user_nickname (str): 被拉黑用户的昵称
user_avatar (str): 被拉黑用户的头像URL
blacklist_reason (str): 拉黑原因
"""
print(
f"拉黑.py: 尝试将拉黑记录写入数据库,用户: {user_nickname} ({user_douyin_id}),原因: {blacklist_reason}") # 调试打印
conn = None
try:
# 确定数据库文件路径 (与 login.py 使用相同的逻辑)
root_path = os.getcwd()
is_pack = getattr(sys, 'frozen', False)
if is_pack:
run_path = sys._MEIPASS
else:
run_path = root_path
blacklist_records_path = os.path.join(run_path, 'blacklist_records.db')
conn = sqlite3.connect(blacklist_records_path)
c = conn.cursor()
# 获取当前时间作为拉黑时间
blacklist_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 插入数据
# 使用 INSERT OR IGNORE 来处理 UNIQUE 约束,如果记录已存在,则忽略插入
c.execute("""
INSERT
OR IGNORE INTO blacklist_records
(account_url_part, user_sid, user_douyin_id, user_nickname, user_avatar, blacklist_reason, blacklist_time)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (account_url_part, user_sid, user_douyin_id, user_nickname, user_avatar, blacklist_reason,
blacklist_time))
conn.commit()
print(f"拉黑.py: 已将拉黑记录写入数据库或记录已存在。用户: {user_nickname}") # 调试打印
except sqlite3.Error as e:
print(f"拉黑.py: 数据库插入错误: {e}") # 错误打印
if conn:
conn.rollback() # 发生错误时回滚事务
except Exception as e:
print(f"拉黑.py: 写入拉黑记录时发生未知错误: {e}") # 错误打印
finally:
if conn:
conn.close()
# --- 新增:将日志写入数据库的函数 ---
def insert_log_record(account_url_part, user_sid, user_douyin_id, user_nickname, user_avatar, code, message):
conn = None
try:
# 确定数据库文件路径 (与 login.py 使用相同的逻辑)
root_path = os.getcwd()
is_pack = getattr(sys, 'frozen', False)
if is_pack:
run_path = sys._MEIPASS
else:
run_path = root_path
all_log_path = os.path.join(run_path, 'all_log.db')
conn = sqlite3.connect(all_log_path)
c = conn.cursor()
log_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
c.execute("""
INSERT
OR IGNORE INTO all_log
(account_url_part, user_sid, user_douyin_id, user_nickname, user_avatar, code, message,log_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (account_url_part, user_sid, user_douyin_id, user_nickname, user_avatar, code, message,
log_time))
conn.commit()
except sqlite3.Error as e:
print(f"日志.py: 数据库插入错误: {e}") # 错误打印
if conn:
conn.rollback() # 发生错误时回滚事务
except Exception as e:
print(f"日志.py: 写入拉黑记录时发生未知错误: {e}") # 错误打印
finally:
print(f'插入成功')
if conn:
conn.close()
class DouyinLiveWebFetcher:
def __init__(self, live_url, roomId, Cookie, account_url_part):
self.id_dict = {}
self.Cookies = Cookie
self.__ttwid = None
self.room_id = roomId
self.live_url = live_url
self.user_agent = zhengchang_ua(1)
self.account_url_part = account_url_part # 主播账号id
self.ws = None # 添加一个属性来跟踪WebSocket连接
self.is_running = False # 用于跟踪监听状态
# 启动
def start(self):
self._connectWebSocket()
# 结束
def stop(self):
if self.is_running:
self.is_running = False
if self.ws:
self.ws.close() # 关闭 WebSocket 连接
print(f"停止监听账号: {self.account_url_part}")
webview.windows[0].evaluate_js(f"updateButtonState('{self.account_url_part}', false)")
def _connectWebSocket(self):
"""
连接抖音直播间websocket服务器请求直播间数据
"""
wss = (
"wss://webcast5-ws-web-hl.douyin.com/webcast/im/push/v2/?app_name=douyin_web"
"&version_code=180800&webcast_sdk_version=1.0.14-beta.0"
"&update_version_code=1.0.14-beta.0&compress=gzip&device_platform=web&cookie_enabled=true"
"&screen_width=1536&screen_height=864&browser_language=zh-CN&browser_platform=Win32"
"&browser_name=Mozilla"
"&browser_version=5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,"
"%20like%20Gecko)%20Chrome/126.0.0.0%20Safari/537.36"
"&browser_online=true&tz_name=Asia/Shanghai"
"&cursor=d-1_u-1_fh-7392091211001140287_t-1721106114633_r-1"
f"&internal_ext=internal_src:dim|wss_push_room_id:{self.room_id}|wss_push_did:7319483754668557238"
f"|first_req_ms:1721106114541|fetch_time:1721106114633|seq:1|wss_info:0-1721106114633-0-0|"
f"wrds_v:7392094459690748497"
f"&host=https://live.douyin.com&aid=6383&live_id=1&did_rule=3&endpoint=live_pc&support_wrds=1"
f"&user_unique_id=7319483754668557238&im_path=/webcast/im/fetch/&identity=audience"
f"&need_persist_msg_count=15&insert_task_id=&live_reason=&room_id={self.room_id}&heartbeatDuration=0"
)
signature = generateSignature(wss)
wss += f"&signature={signature}"
headers = {
"cookie": f"ttwid={self.ttwid}",
"user-agent": self.user_agent,
}
self.ws = websocket.WebSocketApp(
wss,
header=headers,
on_open=self._wsOnOpen,
on_message=self._wsOnMessage,
on_error=self._wsOnError,
on_close=self._wsOnClose,
)
# 运行websocket连接并处理可能的异常
try:
self.ws.run_forever()
except websocket.WebSocketConnectionClosedException as e:
logging.error("_connectWebSocket--WebSocket连接已关闭: %s", e)
self.stop()
except websocket.WebSocketException as e:
logging.error("_connectWebSocket--WebSocket异常: %s", e)
self.stop()
except Exception as e:
logging.error("_connectWebSocket--发生未知异常: %s", e)
self.stop()
raise # 重新抛出异常,以便上层处理
def _wsOnOpen(self, ws):
"""
连接建立成功
"""
self.is_running = True
logging.info(
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----直播间链接成功"
)
print('直播间链接成功')
def heartbeat(ws: websocket.WebSocketApp):
"""发送心跳消息的定时任务"""
while True:
if ws.keep_running: # 检查WebSocket连接是否仍在运行
# print(f'心跳')
ws.send("hi") # 发送自定义的心跳消息
time.sleep(15) # 等待指定的时间间隔
else:
# print(f'心跳结束')
break
heartbeat_thread = Thread(target=heartbeat, args=(ws,))
heartbeat_thread.daemon = True # 设置为守护线程
heartbeat_thread.start()
webview.windows[0].evaluate_js(f"updateButtonState('{self.account_url_part}', true)")
def _wsOnMessage(self, ws: websocket.WebSocketApp, message: str):
"""
接收到数据
:param ws: websocket实例
:param message: 数据
"""
# while True:
# 根据proto结构体解析对象
if message:
try:
package = PushFrame().parse(message)
response = Response().parse(gzip.decompress(package.payload))
# 返回直播间服务器链接存活确认消息,便于持续获取数据
if response.need_ack:
ack = PushFrame(
log_id=package.log_id,
payload_type="ack",
payload=response.internal_ext.encode("utf-8"),
).SerializeToString()
ws.send(ack, websocket.ABNF.OPCODE_BINARY)
# 根据消息类别解析消息体
for msg in response.messages_list:
method = msg.method
try:
{
"WebcastChatMessage": self._parseChatMsg, # 聊天消息
"WebcastGiftMessage": self._parseGiftMsg, # 礼物消息
"WebcastLikeMessage": self._parseLikeMsg, # 点赞消息
"WebcastMemberMessage": self._parseMemberMsg, # 进入直播间消息
"WebcastSocialMessage": self._parseSocialMsg, # 关注消息
"WebcastRoomUserSeqMessage": self._parseRoomUserSeqMsg, # 直播间统计
"WebcastFansclubMessage": self._parseFansclubMsg, # 粉丝团消息
"WebcastControlMessage": self._parseControlMsg, # 直播间状态消息
"WebcastEmojiChatMessage": self._parseEmojiChatMsg, # 聊天表情包消息
"WebcastRoomStatsMessage": self._parseRoomStatsMsg, # 直播间统计信息
"WebcastRoomMessage": self._parseRoomMsg, # 直播间信息
"WebcastRoomRankMessage": self._parseRankMsg, # 直播间排行榜信息
}.get(method)(msg.payload)
except Exception as e:
pass
# logging.error(f"_wsOnMessage--error occurred: {e}")
except Exception as e:
logging.error(f"_wsOnMessage--error occurred: {e}")
def _wsOnError(self, ws, error):
# print("WebSocket error: ", error)
logging.info("WebSocket error: ", error)
def _wsOnClose(self, ws: websocket.WebSocketApp, *args):
logging.info(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----直播间断开")
webview.windows[0].evaluate_js(f"updateButtonState('{self.account_url_part}', false)")
# 聊天消息 拉黑功能需要实现 已实现 待测试
def _parseChatMsg(self, payload):
"""聊天消息"""
message = ChatMessage().parse(payload)
user_name = message.user.nick_name # 用户名称
user_id = message.user.id # 用户id
user_sid = message.user.sec_uid # 用户主页uid
display_id = message.user.display_id # 用户抖音ID
content = message.content # 用户发送的信息
user_img = message.user.avatar_thumb # 用户头像
if user_img:
user_img = user_img.url_list_list[0]
print(
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【聊天msg】- 用户名称:{user_name} - 用户id:{user_id} - 用户主页uid:{user_sid} - 用户抖音ID:{display_id} - 用户发送的信息:{content}"
)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 1, '弹幕:' + content)
# 弹幕拉黑
danmaku_dict = Settings_Options.get('danmaku', {})
if danmaku_dict and danmaku_dict.get('enabled') and danmaku_dict.get('inputValue'):
danmaku_causer = char_in_list(content, danmaku_dict.get('inputValue'))
if danmaku_causer:
causer = f'弹幕存在违规词:{danmaku_causer}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
def _parseGiftMsg(self, payload):
"""礼物消息"""
message = GiftMessage().parse(payload)
user_name = message.user.nick_name # 用户名称
gift_name = message.gift.name # 礼物信息
gift_cnt = message.combo_count # 礼物数量
user_sid = message.user.sec_uid # 用户主页uid
display_id = message.user.display_id # 用户抖音ID
user_img = message.user.avatar_thumb # 用户头像
if user_img:
user_img = user_img.url_list_list[0]
content = f'送出了 {gift_name}x{gift_cnt}'
print(
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【礼物msg】-{user_name} - 送出了 {gift_name}x{gift_cnt}")
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 1, content)
def _parseLikeMsg(self, payload):
"""点赞消息"""
message = LikeMessage().parse(payload)
user_name = message.user.nick_name # 用户名称
count = message.count # 点赞次数
user_sid = message.user.sec_uid # 用户主页uid
display_id = message.user.display_id # 用户抖音ID
user_img = message.user.avatar_thumb # 用户头像
if user_img:
user_img = user_img.url_list_list[0]
content = f'点了{count}个赞'
print(f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【点赞msg】- {user_name} - 点了{count}个赞")
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 1, content)
# 进入直播间信息 拉黑功能需要实现 已实现 待测试
def _parseMemberMsg(self, payload):
"""进入直播间信息"""
message = MemberMessage().parse(payload)
following_coun = message.user.follow_info.following_count # 用户关注数
follower_count = message.user.follow_info.follower_count # 用户粉丝数
user_name = message.user.nick_name # 用户名称
user_sid = message.user.sec_uid # 用户主页uid
user_id = message.user.id # 用户id
user_follow_status = message.user.follow_info.follow_status # 用户关注状态 0未关注 1已关注
user_img = message.user.avatar_thumb # 用户头像
if user_img:
user_img = user_img.url_list_list[0]
display_id = message.user.display_id # 用户抖音ID
gender = ['未知', "", ""][message.user.gender] # 用户性别 0未知 1男 2
# if 'ducking' in user_name:
# print(message)
# input(11111)
# if '蛋仔' in user_name:
print(
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【进场msg】- 用户名称:{user_name} - 用户id:{user_id} - 用户uid:{user_sid} - 抖音id:{display_id} - 性别:{gender} 进入了直播间"
)
content = f'进入了直播间'
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 0, content)
# with open('user_data.json', 'a', encoding='utf-8') as f:
# f.write(json.dumps({
# 'account_url_part': self.account_url_part,
# 'user_sid': user_sid,
# 'user_douyin_id': display_id,
# 'user_nickname': user_name,
# 'user_avatar': user_img,
# 'blacklist_reason': '进入直播间',
# 'blacklist_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# }, ensure_ascii=False) + ',\n')
# ID 白名单
id_whitelist_dict = Settings_Options.get('id-whitelist', {})
if id_whitelist_dict and id_whitelist_dict.get('enabled') and id_whitelist_dict.get('inputValue'):
if display_id in id_whitelist_dict.get('inputValue'):
print(f'{display_id}存在白名单中 不拉黑')
return
# ID 黑名单
id_blacklist_rule_dict = Settings_Options.get('id-blacklist-rule', {})
# print(id_blacklist_rule_dict)
# print(Settings_Options)
if id_blacklist_rule_dict and id_blacklist_rule_dict.get('enabled') and id_blacklist_rule_dict.get(
'inputValue'):
# blacklist_ids = [uid.strip() for uid in id_blacklist_rule_dict.get('inputValue','').splitlines() if uid.strip()]
# print(f'进入黑名单:{blacklist_ids}')
if display_id in id_blacklist_rule_dict.get('inputValue'):
causer = '黑名单拉黑'
print(f'{display_id}存在黑名单中 拉黑')
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
return
# 本地数据互通 数据互通 拉黑 // 待实现
# 云端数据互通 疑似账号 拉黑 // 待实现
rcfs = message.anchor_display_text.default_patter.replace('{0:user}', '').replace('{1:string} ', '关注')
# 统计用户进入次数
if display_id in self.id_dict:
self.id_dict[display_id] += 1
else:
self.id_dict[display_id] = 1
enter_sum = self.id_dict[display_id]
# 通过分享进入直播间 // 偶尔触发 人少的直播间可以
if '分享' in rcfs:
share_entry_dict = Settings_Options.get('share-entry', {})
if share_entry_dict and share_entry_dict.get('enabled'):
causer = '通过分享进入直播间'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因{causer}")
# 通过关注进入直播间 粉丝进入
if int(user_follow_status) == 1:
follow_entry_dict = Settings_Options.get('follow-entry', {})
if follow_entry_dict and follow_entry_dict.get('enabled'):
causer = '通过关注进入直播间'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因{causer}")
# 性别拉黑
gender_dict = Settings_Options.get("gender", {})
if gender_dict:
if gender_dict.get("enabled") and gender_dict.get("options"):
selected_genders = gender_dict.get("options")
causer = Gender_blocking(gender, selected_genders)
if causer:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑原因:{causer}")
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
# 粉丝数量大于 拉黑
follower_count_dict = Settings_Options.get("follower-count", {})
if follower_count_dict and follower_count_dict.get('enabled') and follower_count_dict.get('inputValue'):
if int(follower_count) > int(follower_count_dict.get('inputValue')):
causer = f'粉丝数量大于设置的值{follower_count_dict.get("inputValue")}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 关注数量大于 拉黑
following_count_dict = Settings_Options.get("following-count", {})
if following_count_dict and following_count_dict.get('enabled') and following_count_dict.get('inputValue'):
if int(following_coun) > int(following_count_dict.get('inputValue')):
causer = f'关注数量大于设置的值{following_count_dict.get("inputValue")}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 进入次数大于 拉黑
entry_count_dict = Settings_Options.get('entry-count', {})
if entry_count_dict and entry_count_dict.get('enabled') and entry_count_dict.get('inputValue'):
if int(enter_sum) > int(entry_count_dict.get('inputValue')):
causer = f'进入次数大于设置的值{entry_count_dict.get("inputValue")}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 昵称关键词 拉黑
nickname_dict = Settings_Options.get("nickname", {})
if nickname_dict and nickname_dict.get('enabled') and nickname_dict.get('inputValue'):
nickname_causer = char_in_list(user_name, nickname_dict.get('inputValue'))
if nickname_causer:
causer = f'昵称存在违规词:{nickname_causer}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 访问用户主页
'''
判断 地区 个性签名 IP归属地 年龄区间 作品数量 蓝V 私密账号
只要有一个满足条件 则进入查看用户主页信息
'''
if Settings_Options.get('region', {}).get('enabled') or Settings_Options.get('signature', {}).get(
'enabled') or Settings_Options.get('ip', {}).get('enabled') or Settings_Options.get('age-range',
{}).get(
'enabled') or Settings_Options.get('work-count', {}).get('enabled') or Settings_Options.get('blue-v',
{}).get(
'enabled') or Settings_Options.get('private-account', {}).get('enabled'):
user_data = View_user_home(user_sid, self.Cookies)
print(f'进入请求用户主页', user_data)
# 地区 拉黑
region_dict = Settings_Options.get('region', {})
if region_dict and region_dict.get('enabled') and region_dict.get('inputValue'):
region_causer = char_in_list(user_data.get('country', ''), region_dict.get('inputValue'))
if region_causer:
causer = f'地区存在违规词:{region_causer}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 个性签名 拉黑
signature_dict = Settings_Options.get('signature', {})
if signature_dict and signature_dict.get('enabled') and signature_dict.get('inputValue'):
signature_causer = char_in_list(user_data.get('desc', ''), signature_dict.get('inputValue'))
if signature_causer:
causer = f'个性签名存在违规词:{signature_causer}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# IP归属地 拉黑
ip_dict = Settings_Options.get('ip', {})
if ip_dict and ip_dict.get('enabled') and ip_dict.get('inputValue'):
if user_data.get('ipLocation', ''):
ip_causer = char_in_list(user_data.get('ipLocation', ''), ip_dict.get('inputValue'))
else:
ip_causer = char_in_list(user_data.get('ip', ''), ip_dict.get('inputValue'))
if ip_causer:
causer = f'IP归属地存在违规词{ip_causer}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 年龄区间 拉黑
age_range_dict = Settings_Options.get('age-range', {})
if age_range_dict and age_range_dict.get('enabled') and age_range_dict.get('inputValue'):
if check_blacklist_age(age_range_dict.get('inputValue'), user_data.get('age', '')):
causer = f'年龄区间存在违规词:{user_data.get('age', '')}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 作品数量大于 拉黑
work_count_dict = Settings_Options.get('work-count', {})
if work_count_dict and work_count_dict.get('enabled') and work_count_dict.get('inputValue'):
if int(user_data.get('awemeCount', '')) > int(work_count_dict.get('inputValue')):
causer = f'作品数量大于设置的值{work_count_dict.get("inputValue")}'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 开通蓝v 拉黑
blue_v_dict = Settings_Options.get('blue-v', {})
if blue_v_dict and blue_v_dict.get('enabled') and user_data.get('enterpriseVerifyReason', ''):
causer = f'开通蓝V'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 私密账号 拉黑
private_account_dict = Settings_Options.get('private-account', {})
if private_account_dict and private_account_dict.get('enabled') and user_data.get('private', ''):
causer = f'私密账号'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 开通橱窗 拉黑 // 没有相关参数
# 疑似账号 拉黑 // 不做
# 关注 分享消息 拉黑功能需要实现 已实现 待测试
def _parseSocialMsg(self, payload):
"""关注 分享消息"""
message = SocialMessage().parse(payload)
user_name = message.user.nick_name # 用户名称
user_sid = message.user.sec_uid # 用户主页uid
user_id = message.user.id # 用户id
display_id = message.user.display_id # 用户抖音ID
# print(f'message.follow_count-->{message.follow_count}')
share_Target = message.share_target # 分享目标
user_img = message.user.avatar_thumb # 用户头像
if user_img:
user_img = user_img.url_list_list[0]
# 分享 拉黑 // 可以确认媒体
if message.follow_count == 0:
print(
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【分享msg】- 用户名称:{user_name} - 用户id:{user_id} 分享了主播")
content = '分享了主播'
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 1, content)
share_dict = Settings_Options.get("share", {})
if share_dict and share_dict.get("enabled") and share_dict.get("options"):
selected_share_keys = share_dict.get("options")
causer = check_share_block(int(share_Target), selected_share_keys)
if causer:
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
# 关注 拉黑
else:
print(
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【关注msg】- 用户名称:{user_name} - 用户id:{user_id} 关注了主播")
content = '关注了主播'
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 1, content)
follow_dict = Settings_Options.get("follow", {})
if follow_dict and follow_dict.get("enabled"):
causer = f'关注了主播'
lahei(user_sid, user_id, self.Cookies)
insert_blacklist_record(self.account_url_part, user_sid, display_id, user_name, user_img, causer)
insert_log_record(self.account_url_part, user_sid, display_id, user_name, user_img, 2, causer)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}----用户:{user_name}拉黑成功:原因:{causer}")
def _parseRoomUserSeqMsg(self, payload):
"""直播间统计"""
message = RoomUserSeqMessage().parse(payload)
current = message.total # 当前观看人数
total = message.total_pv_for_anchor # 累计观看人数
print(
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【统计msg】- 当前观看人数: {current} - 累计观看人数: {total}")
def _parseFansclubMsg(self, payload):
"""粉丝团消息"""
message = FansclubMessage().parse(payload)
content = message.content # 内容
print(f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【粉丝团msg】 {content}")
def _parseControlMsg(self, payload):
"""直播间状态消息"""
message = ControlMessage().parse(payload)
if message.status == 3:
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}----直播间已结束')
self.stop()
def _parseEmojiChatMsg(self, payload):
"""聊天表情包消息"""
message = EmojiChatMessage().parse(payload)
emoji_id = message.emoji_id # 表情包id
user = message.user # 用户
common = message.common # 通用信息
default_content = message.default_content # 默认内容
print(
f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【聊天表情包id】- {emoji_id} - user{user} - common:{common} - default_content:{default_content}"
)
def _parseRoomStatsMsg(self, payload):
"""直播间统计msg"""
message = RoomStatsMessage().parse(payload)
display_long = message.display_long # 直播间统计信息
print(f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【直播间统计msg】- {display_long}")
def _parseRoomMsg(self, payload):
"""直播间msg"""
message = RoomMessage().parse(payload)
common = message.common
room_id = common.room_id # 直播间id
print(f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【直播间msg】- 直播间id:{room_id}")
def _parseRankMsg(self, payload):
"""直播间排行榜msg"""
message = RoomRankMessage().parse(payload)
ranks_list = message.ranks_list
print(f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}-----【直播间排行榜msg】{ranks_list}")
@property
def ttwid(self):
"""
产生请求头部cookie中的ttwid字段访问抖音网页版直播间首页可以获取到响应cookie中的ttwid
:return: ttwid
"""
if self.__ttwid:
return self.__ttwid
headers = {
"User-Agent": self.user_agent,
}
try:
response = requests.get(
"https://live.douyin.com/", headers=headers, verify=False
)
response.raise_for_status()
except Exception as e:
logging.error(f"ttwid--error occurred: {e}")
else:
self.__ttwid = response.cookies.get("ttwid")
return self.__ttwid
class Api:
def __init__(self):
self.monitor_instances = {} # 用于存储监听实例
# ... (如果不需要 get_localStorage 方法可以删除或保留,保持不变)
def get_localStorage(self, data):
# print(data)
if not os.path.exists(json_path):
open(json_path, 'w', encoding='utf-8').close()
else:
# 写入配置文件
with open(json_path, 'w', encoding='utf-8') as f:
f.write(data)
# print(data)
update_global_settings(data)
# def get_path(self,path,path_json):
# root_path = os.getcwd()
# is_pack = getattr(sys, 'frozen', False)
# if is_pack:
# run_path = sys._MEIPASS
# else:
# run_path = root_path
# path = os.path.join(run_path, 'index', 'user_Settings_Options.json')
# return path
# 触发登录流程并更新界面中的账号列表。
def login_click(self):
"""触发登录流程并更新界面中的账号列表。"""
print('执行登录流程...')
try:
# get_cookie 函数负责打开浏览器、登录、保存 cookie 并更新 cookie_name.txt
# 它也应该负责将 cookie 数据保存到对应的 url_part.pkl 文件中
# 成功返回 True失败返回 False 或抛出异常(由于重试)
success = get_cookie() # 调用 获取cookie.py 中的函数
if success:
print('登录流程成功,正在刷新界面账号列表...')
# 登录成功并文件更新后,刷新界面列表
self.refresh_accounts() # 使用 self 调用类中的另一个方法
return True # 指示登录流程已成功启动(不保证界面刷新已完成)
else:
print('登录流程失败。')
# (可选)通知前端失败
# window.evaluate_js('alert("登录失败,请重试。")')
return False # 指示登录流程失败
except Exception as e:
print(f'登录流程发生错误: {e}')
if '与页面的连接已断开' in str(e):
print('用户取消登录')
return False
# (可选)通知前端出错
# window.evaluate_js(f'alert("登录发生错误: {e}")')
return False # 指示登录流程因错误失败
# 从文件读取账号并发送到前端刷新表格。
def refresh_accounts(self):
"""从文件读取账号并发送到前端刷新表格。"""
print("正在刷新账号列表...")
accounts = read_accounts_from_file()
print(f"refresh_accounts: 读取到 {len(accounts)} 个账号,发送到前端。")
# 将账号数据发送到 JavaScript 函数 updateAccountTable
# 使用 json.dumps 将字典列表序列化为 JSON 字符串
# 确保 JS 函数 updateAccountTable 存在并准备好接收此数据
# 'window' 在 pywebview 的 Api 方法上下文中自动可用
window.evaluate_js(f'updateAccountTable({json.dumps(accounts)})')
print("refresh_accounts: 已发送账号数据到前端进行刷新。")
# 页面加载时由 JS 调用,获取初始账号列表。
def get_initial_accounts(self):
"""页面加载时由 JS 调用,获取初始账号列表。"""
print("前端请求获取初始账号列表...")
self.refresh_accounts() # 只需调用 refresh_accounts 进行读取和发送
# --- 新增 删除账号方法 ---
def delete_account(self, url_part):
"""
删除指定 url_part 的账号信息及其对应的 cookie 数据文件
参数: url_part (str) - 要删除账号的 url_part (例如 '76861274466')
返回: bool - 成功删除任一文件返回 True, 都未找到返回 False
"""
print(f"Api: 接收到删除请求url_part: {url_part}")
account_found_in_txt = False
cookie_file_deleted = False
try:
# 1. 删除 cookie_name.txt 中的对应行
lines_to_keep = []
txt_file_exists = os.path.isfile('cookie_name.txt')
if txt_file_exists:
try:
with open('cookie_name.txt', 'r', encoding='utf-8') as f:
for line in f:
stripped_line = line.strip()
if stripped_line:
parts = stripped_line.split('|')
# 检查第一个部分是否与要删除的 url_part 匹配
if len(parts) >= 1 and parts[0] == url_part:
print(f"Api: 在 cookie_name.txt 中找到并标记删除行: '{stripped_line}'")
account_found_in_txt = True
else:
lines_to_keep.append(line) # 保留不匹配的行 (包括原始换行符)
# 如果找到了要删除的行,则重写文件
if account_found_in_txt:
with open('cookie_name.txt', 'w', encoding='utf-8') as f:
f.writelines(lines_to_keep)
print(f"Api: 已从 cookie_name.txt 中删除 url_part 为 {url_part} 的行并重写文件。")
else:
print(f"Api: 未在 cookie_name.txt 中找到 url_part 为 {url_part} 的行。")
except Exception as e:
print(f"Api: 处理 cookie_name.txt 文件时发生错误: {e}")
else:
print(f"Api: 文件 cookie_name.txt 不存在,跳过文本文件删除。")
# 2. 删除对应的 .pkl 文件
pkl_filename = f'{url_part}.pkl' # 根据 url_part 构建文件名
pkl_file_exists = os.path.isfile(pkl_filename)
if pkl_file_exists:
try:
os.remove(pkl_filename)
cookie_file_deleted = True
print(f"Api: 已删除 cookie 文件: {pkl_filename}")
except Exception as e:
print(f"Api: 删除文件 {pkl_filename} 时出错: {e}")
else:
print(f"Api: 未找到 cookie 文件: {pkl_filename},跳过文件删除。")
# 判断是否至少在一个地方文本文件或pkl文件进行了删除操作
success = account_found_in_txt or cookie_file_deleted
if success:
print(f"Api: url_part 为 {url_part} 的账号删除处理完成。")
else:
print(f"Api: url_part 为 {url_part} 的账号未在任何文件中找到。")
except Exception as e:
# 捕获其他可能的异常
print(f"Api: 删除账号 {url_part} 时发生总体错误: {e}")
success = False
# 前端会根据返回的 success 标志来决定是否调用 refreshTable()
return success
# --- 如果以后需要,添加其他操作方法 ---
# 监控账号
def monitor_account(self, url_part, username):
if url_part in self.monitor_instances:
print(f"账号 {url_part} 正在监听中,不能重复监听。")
return False
print(f"正在监控 url_part 为 {url_part} 的账号")
# 实现开始监控的逻辑
user_name = username
Cookie = load_cookies_from_file(f'{url_part}.pkl')
cookies_dict = {c.name: c.value for c in Cookie}
cookie_str = "; ".join(f"{k}={v}" for k, v in cookies_dict.items())
url = 'https://www.douyin.com/user/self?from_tab_name=live' # 主播主页
print(f'进入')
# url = 'https://www.douyin.com/user/MS4wLjABAAAAc9m4vmspNsSiYP348XRC-OLVqSXRn8Rdtl618fbzYag' # 测试链接
headers = {
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
res = session.get(url, headers=headers, cookies=Cookie)
print('开始第一次请求')
while True:
if not res.status_code == 200:
print(datetime.now(), '请求失败,重试一次')
# app_edit(f'{datetime.now()}----请求失败,重试一次')
# 循环查询是否有开播的信息
res = session.get(url, headers=headers, cookies=Cookie)
# with open('index.html', 'w', encoding='utf-8') as f:
# f.write(res.text)
if 'const verify_data' in res.text:
print('进入直播中 const verify_data')
print(datetime.now(), f'{user_name}--Cookie失效')
if '直播中</span>' in res.text:
print('进入直播中')
# with open('index.html', 'w', encoding='utf-8') as f:
# f.write(res.text)
zburl = re.findall(r'<a href="(https://live.douyin.com/.*?)(?:\?action|")', res.text)
# print(zburl)
# zburl = re.findall(r'<a href="(?:https://live.douyin.com/\d+)"',res.text)
print(zburl[1])
if zburl:
print(datetime.now(), f'{user_name}--检查到已开播')
roomId = get_roomId(zburl[1], Cookie)
print(roomId)
# input(11111)
# print(roomId)
# app_edit(f'{datetime.now()}----检查到已开播')
# return zburl[0]
# return zburl[1] # 测试
# DouyinLiveWebFetcher(zburl[1], roomId[0], cookie_str, url_part).start()
# 创建监听实例
fetcher = DouyinLiveWebFetcher(zburl[1], roomId[0], cookie_str, url_part)
# 存储监听实例
self.monitor_instances[url_part] = fetcher
# 启动监听
fetcher.start()
print(f"成功启动账号 {url_part} 的监听流程。")
return True
return
time.sleep(2)
print(datetime.now(), f'{user_name}--未直播等待2秒后再次查询')
# 新增:从数据库获取拉黑记录的方法
def get_blacklist_records(self, account_url_part):
"""
从数据库获取指定主播账号的拉黑记录
参数: account_url_part (str) - 主播账号的 url_part
返回: str - 拉黑记录列表的 JSON 字符串
"""
print(f"Api: 接收到获取拉黑记录请求,账号 url_part: {account_url_part}") # 调试打印
conn = None
records = []
try:
# 使用与主程序相同的数据库路径
root_path = os.getcwd()
is_pack = getattr(sys, 'frozen', False)
if is_pack:
run_path = sys._MEIPASS
else:
run_path = root_path
blacklist_records_path = os.path.join(run_path, 'blacklist_records.db')
conn = sqlite3.connect(blacklist_records_path)
c = conn.cursor()
# 查询指定账号的拉黑记录,按拉黑时间倒序排列
# 注意:这里假设数据库字段名和前端期望的字段名映射关系
c.execute("""
SELECT id,
user_sid,
user_douyin_id,
user_nickname,
user_avatar,
blacklist_reason,
blacklist_time
FROM blacklist_records
WHERE account_url_part = ?
ORDER BY blacklist_time DESC
""", (account_url_part,))
# 将查询结果转换为前端期望的字典格式
rows = c.fetchall()
for row in rows:
records.append({
'id': row[0], # 数据库的 id
'userId': row[1], # user_sid 映射到前端的 userId
'douyinId': row[2], # user_douyin_id 映射到前端的 douyinId
'nickname': row[3], # user_nickname 映射到前端的 nickname
'avatar': row[4], # user_avatar 映射到前端的 avatar
'reason': row[5], # blacklist_reason 映射到前端的 reason
'time': row[6] # blacklist_time 映射到前端的 time
})
print(f"Api: 查询到 {len(records)} 条拉黑记录。") # 调试打印
except sqlite3.Error as e:
print(f"Api: 数据库查询错误: {e}") # 错误打印
# 发生错误时返回空列表或错误指示
records = [] # 确保发生错误时返回空列表
except Exception as e:
print(f"Api: 获取拉黑记录时发生未知错误: {e}") # 错误打印
records = [] # 确保发生错误时返回空列表
finally:
if conn:
conn.close()
# 返回 JSON 字符串
return json.dumps(records)
# 新增方法:强制从数据库刷新拉黑记录并更新缓存
def refresh_blacklist_records(self, account_url_part):
"""
强制从数据库刷新指定主播账号的拉黑记录更新缓存并返回最新数据
参数: account_url_part (str) - 主播账号的 url_part
返回: str - 拉黑记录列表的 JSON 字符串
"""
print(f"Api: 接收到强制刷新拉黑记录请求,账号 url_part: {account_url_part}") # 调试打印
# 设置强制刷新标志
self._force_refresh = True
# 调用 get_blacklist_records由于设置了标志它会从数据库读取
return self.get_blacklist_records(account_url_part)
# 新增方法根据记录ID从数据库删除拉黑记录
def delete_blacklist_record_by_id(self, record_id):
"""
根据数据库记录ID删除单条拉黑记录
参数: record_id (str) - 要删除的拉黑记录的数据库ID
返回: bool - 删除成功返回 True失败返回 False
"""
print(f"Api: 接收到删除单条拉黑记录请求记录ID: {record_id}") # 调试打印
conn = None
try:
root_path = os.getcwd()
is_pack = getattr(sys, 'frozen', False)
if is_pack:
run_path = sys._MEIPASS
else:
run_path = root_path
blacklist_records_path = os.path.join(run_path, 'blacklist_records.db')
conn = sqlite3.connect(blacklist_records_path)
c = conn.cursor()
# 执行删除操作
c.execute("DELETE FROM blacklist_records WHERE id = ?", (record_id,))
conn.commit()
# 检查是否删除了记录
deleted_count = c.rowcount
print(f"Api: 删除拉黑记录 (ID: {record_id})。影响行数: {deleted_count}") # 调试打印
# 如果使用后端缓存,删除记录后需要更新缓存
# TODO: 根据删除的 record_id 找到对应的 account_url_part并更新 self._blacklist_cache
# 这需要从数据库中查询 record_id 对应的 account_url_part或者修改前端传入 account_url_part
return deleted_count > 0 # 如果删除了至少一行,返回 True
except sqlite3.Error as e:
print(f"Api: 数据库删除错误 (ID: {record_id}): {e}") # 错误打印
return False # 数据库错误返回 False
except Exception as e:
print(f"Api: 删除拉黑记录 (ID: {record_id}) 时发生未知错误: {e}") # 错误打印
return False
finally:
if conn:
conn.close()
# 新增方法根据记录ID列表从数据库批量删除拉黑记录
def batch_delete_blacklist_records_by_ids(self, record_ids):
"""
根据数据库记录ID列表批量删除拉黑记录
参数: record_ids (list) - 要删除的拉黑记录ID列表
返回: bool - 所有删除操作都成功返回 True任一失败返回 False
(或者可以返回成功删除的数量)
"""
print(f"Api: 接收到批量删除拉黑记录请求记录IDs: {record_ids}") # 调试打印
if not record_ids:
print("Api: 批量删除请求 IDs 为空,未执行删除操作。") # 调试打印
return True # 空列表视为成功处理
conn = None
success_count = 0
try:
root_path = os.getcwd()
is_pack = getattr(sys, 'frozen', False)
if is_pack:
run_path = sys._MEIPASS
else:
run_path = root_path
blacklist_records_path = os.path.join(run_path, 'blacklist_records.db')
conn = sqlite3.connect(blacklist_records_path)
c = conn.cursor()
# 批量执行删除操作
# 使用事务以提高效率和确保原子性
conn.execute('BEGIN TRANSACTION;')
for record_id in record_ids:
try:
c.execute("DELETE FROM blacklist_records WHERE id = ?", (record_id,))
success_count += c.rowcount
print(f"Api: 尝试删除记录 (ID: {record_id})。影响行数: {c.rowcount}") # 调试打印
except sqlite3.Error as e:
print(f"Api: 数据库删除错误 (ID: {record_id}) 在批量操作中: {e}") # 错误打印
# 可以在这里选择是否回滚整个事务或者记录失败并继续
# 为了简单,这里记录错误并继续
conn.commit()
print(f"Api: 批量删除操作完成。成功删除 {success_count} 条记录。") # 调试打印
# 如果使用后端缓存,批量删除后需要更新缓存
# 这比较复杂可能需要遍历删除的IDs找到涉及的 account_url_part 并刷新对应缓存
return success_count == len(record_ids) # 如果成功删除的数量等于请求的数量,返回 True
except sqlite3.Error as e:
print(f"Api: 数据库批量删除操作发生错误: {e}") # 错误打印
if conn:
conn.rollback() # 回滚事务
return False # 数据库错误返回 False
except Exception as e:
print(f"Api: 批量删除拉黑记录时发生未知错误: {e}") # 错误打印
return False
finally:
if conn:
conn.close()
# 新增方法:执行解除拉黑操作(调用 拉黑.py 中的 jclahei
def perform_unblock(self, account_url_part, user_sid):
"""
调用 拉黑.py 中的 jclahei 方法执行解除拉黑操作
参数:
account_url_part (str): 当前主播账号的 url_part
user_sid (str): 要解除拉黑用户的 user_sid (抖音主页ID)
返回: str - 解除拉黑操作的结果消息 (成功或失败原因)
"""
print(
f"Api: 接收到执行解除拉黑操作请求。账号 url_part: {account_url_part}, 用户 user_sid: {user_sid}") # 调试打印
cookie_file_path = f'./{account_url_part}.pkl'
try:
# 加载当前主播账号的 Cookie
Cookie = load_cookies_from_file(cookie_file_path)
if not Cookie:
print(f"Api: perform_unblock: 未找到或加载 cookie 文件失败: {cookie_file_path}") # 错误打印
return "加载账号 Cookie 失败"
# 调用 拉黑.py 中的 jclahei 方法
# print(f"Api: 调用 jclahei 方法参数user_sid={user_sid}, Cookie={Cookie}") # 调试打印
# 注意jclahei 方法中的 user_id 参数在您的 拉黑.py 中硬编码了一个值
# jclahei(sec_user_id, Cookie, user_id='7454438944119719433')
# 您需要修改 拉黑.py 中的 jclahei 方法,使其正确接收 user_id 参数,或者确认 user_id 是否必须以及如何获取。
# 假设 user_sid 就是 jclahei 需要的 sec_user_id并且 user_id 参数是可选或可以从 user_sid 推导。
# 如果 jclahei 需要 user_id您需要从数据库或其他地方获取它或者修改数据库存储 user_id
# 目前根据您的描述jclahei 需要 user_sid 和 cookie。我们使用 user_sid 作为 sec_user_id 传入。
# 关于 user_id 参数,根据 拉黑.py 代码,它有一个默认值 '7454438944119719433'。
# 如果这个默认值不正确,或者需要传入实际被拉黑用户的 user_id您需要调整这里的逻辑。
# 检查 拉黑.py 中的 jclahei 实现,它似乎是 POST 到 /aweme/v1/web/user/block/
# payload 中包含 block_type=0解除拉黑、sec_user_id 和 user_id。
# sec_user_id 应该是被拉黑用户的 user_siduser_id 是被拉黑用户的数字ID。
# 如果您的数据库中只存储了 user_sid您可能需要修改 jclahei 或从数据库获取 user_id。
# **重要:请仔细检查 拉黑.py 中 jclahei 方法需要的 user_id 参数是如何获取和使用的。**
# **根据您的 `拉黑.py` 代码jclahei 方法签名是 `jclahei(sec_user_id, Cookie, user_id='7454438944119719433')`**
# **这里的 `user_id` 参数有一个硬编码的默认值。这可能不是您期望的。**
# **您需要确保 `jclahei` 使用传入的 `sec_user_id` (即被拉黑用户的 user_sid) 来正确执行解除拉黑。**
# **并且确认是否需要正确的 `user_id`,如果需要,您需要修改数据库存储 user_id并在获取拉黑记录时一同返回。**
# **以下代码假设 jclahei 可以仅通过 sec_user_id 和 Cookie 来解除拉黑。**
# 临时代码:使用硬编码的 user_id 调用 jclahei您需要根据实际情况修改
# result = jclahei(user_sid, Cookie, user_id='7454438944119719433')
# 更好的方法:如果数据库存储了 user_id (即前端的 item.userId),获取它并传入
# 这需要修改 get_blacklist_records 方法,让它也返回 user_id
# 在 id-blacklist.html 中,我在创建列表项时已经将 item.userId 存储在 data-user-id 属性中
# 前端调用时应该传入这个 user_id
# 假设前端会传入 record_id 和 user_sid
# 如果 jclahei 只需要 user_sid 和 Cookie那么直接调用
result = jclahei(user_sid, Cookie)
print(f"Api: jclahei 方法调用结果: {result}") # 调试打印
# 根据 jclahei 的返回值判断操作结果
if result is True:
print(f"Api: 解除拉黑成功,用户 user_sid: {user_sid}") # 调试打印
return "解除拉黑成功"
else:
print(f"Api: 解除拉黑失败或需要二次确认。结果: {result}") # 调试打印
# jclahei 可能返回错误信息或状态码,这里需要根据实际返回值处理
if isinstance(result, str):
return f"解除拉黑失败: {result}"
elif isinstance(result, int):
return f"解除拉黑失败: HTTP状态码 {result}"
else:
return "解除拉黑失败: 未知错误"
except FileNotFoundError:
print(f"Api: perform_unblock: 未找到 cookie 文件: {cookie_file_path}") # 错误打印
return "加载账号 Cookie 失败"
except Exception as e:
print(f"Api: perform_unblock: 调用 jclahei 时发生错误: {e}") # 错误打印
return f"解除拉黑操作出错: {e}"
# 新增方法:执行批量解除拉黑操作
def perform_batch_unblock(self, account_url_part, user_sid_list):
"""
批量调用 拉黑.py 中的 jclahei 方法执行解除拉黑操作
参数:
account_url_part (str): 当前主播账号的 url_part
user_sid_list (list): 要解除拉黑用户的 user_sid 列表
返回: str - 批量解除拉黑操作的结果消息
"""
print(
f"Api: 接收到执行批量解除拉黑操作请求。账号 url_part: {account_url_part}, 用户 user_sid 列表: {user_sid_list}") # 调试打印
cookie_file_path = f'./{account_url_part}.pkl'
results = {} # 存储每个用户的解除结果
try:
# 加载当前主播账号的 Cookie
Cookie = load_cookies_from_file(cookie_file_path)
if not Cookie:
print(f"Api: perform_batch_unblock: 未找到或加载 cookie 文件失败: {cookie_file_path}") # 错误打印
return json.dumps({"success": False, "message": "加载账号 Cookie 失败"})
# 遍历用户列表,逐个调用 jclahei
for user_sid in user_sid_list:
print(f"Api: 尝试解除拉黑用户 user_sid: {user_sid}") # 调试打印
# 调用 拉黑.py 中的 jclahei 方法,同样需要注意 user_id 参数的问题
# 临时代码:使用硬编码的 user_id 调用 jclahei您需要根据实际情况修改
# result = jclahei(user_sid, Cookie, user_id='7454438944119719433')
# 如果 jclahei 只需要 user_sid 和 Cookie那么直接调用
result = jclahei(user_sid, Cookie)
results[user_sid] = result # 存储结果
# 可以添加一个短暂的延迟,避免请求过快
time.sleep(0.5) # 例如延迟0.5秒
print(f"Api: 批量解除拉黑操作完成。结果: {results}") # 调试打印
# 统计成功和失败数量,构建返回消息
success_count = sum(1 for r in results.values() if r is True)
failed_users = [uid for uid, r in results.items() if r is not True]
if not failed_users:
message = f"成功解除拉黑 {success_count} 个用户。"
return json.dumps({"success": True, "message": message})
else:
message = f"成功解除拉黑 {success_count} 个用户,{len(failed_users)} 个用户失败。"
# 可以在消息中包含失败用户的 user_sid 或错误详情,但这可能暴露过多信息,按需调整
return json.dumps({"success": False, "message": message, "failed_users": failed_users})
except FileNotFoundError:
print(f"Api: perform_batch_unblock: 未找到 cookie 文件: {cookie_file_path}") # 错误打印
return json.dumps({"success": False, "message": "加载账号 Cookie 失败"})
except Exception as e:
print(f"Api: perform_batch_unblock: 调用 jclahei 时发生错误: {e}") # 错误打印
return json.dumps({"success": False, "message": f"批量解除拉黑操作出错: {e}"})
# 停止监听
def stop_monitoring(self, url_part):
"""
停止指定 url_part 账号的监听
参数: url_part (str) - 要停止监听的账号的 url_part
返回: bool - 成功停止返回 True失败返回 False
"""
try:
if url_part in self.monitor_instances:
instance = self.monitor_instances[url_part]
instance.stop()
del self.monitor_instances[url_part]
print(f"成功停止账号 {url_part} 的监听流程。")
return True
else:
print(f"未找到 url_part 为 {url_part} 的监听实例")
return False
except Exception as e:
print(f"停止监听时发生错误: {e}")
return False
if __name__ == '__main__':
root_path = os.getcwd()
is_pack = getattr(sys, 'frozen', False)
if is_pack:
run_path = sys._MEIPASS
else:
run_path = root_path
json_path = os.path.join(run_path, 'index', 'user_Settings_Options.json')
if not os.path.exists(json_path):
with open(json_path, 'w', encoding='utf-8') as f:
f.write(
'{"id-whitelist":{"name":"id白名单","enabled":false,"option":"","inputValue":""},"id-blacklist-rule":{"name":"id黑名单","enabled":false,"inputValue":""},"danmaku":{"name":"弹幕关键词拉黑","enabled":false,"inputValue":""},"local-data":{"name":"数据互通","enabled":false},"suspected-account":{"name":"疑似账号","enabled":false},"follow":{"name":"关注拉黑","enabled":false},"share-entry":{"name":"通过分享进入直播间","enabled":false},"follow-entry":{"name":"通过关注进入直播间","enabled":false},"private-account":{"name":"私密账号","enabled":false},"blue-v":{"name":"开通蓝v","enabled":false},"share":{"name":"分享拉黑","enabled":false,"options":[]},"gender":{"name":"性别拉黑","enabled":false,"options":[]},"age-range":{"name":"年龄区间拉黑","enabled":false,"inputValue":""},"follower-count":{"name":"粉丝数量大于拉黑","enabled":false,"inputValue":""},"following-count":{"name":"关注数量大于拉黑","enabled":false,"inputValue":""},"work-count":{"name":"作品数量大于拉黑","enabled":false,"inputValue":""},"entry-count":{"name":"用户进入次数拉黑","enabled":false,"inputValue":""},"nickname":{"name":"昵称关键词","enabled":false,"inputValue":""},"signature":{"name":"个性签名关键词拉黑","enabled":false,"inputValue":""},"region":{"name":"地区拉黑","enabled":false,"inputValue":""},"ip":{"name":"IP拉黑","enabled":false,"inputValue":""}}')
blacklist_records_path = os.path.join(run_path, 'blacklist_records.db')
# 创建数据库 如果存在则不创建
if not os.path.exists(blacklist_records_path):
conn = sqlite3.connect(blacklist_records_path)
c = conn.cursor()
# 创建表
c.execute('''CREATE TABLE blacklist_records
(
id INTEGER PRIMARY KEY AUTOINCREMENT, -- 自增主键
account_url_part TEXT NOT NULL, -- 拉黑该记录的主播账号的 url_part (sec_uid)
user_sid TEXT NOT NULL, -- 被拉黑用户的抖音主页ID (user_sid)
user_douyin_id TEXT, -- 被拉黑用户的抖音号 (display_id)
user_nickname TEXT, -- 被拉黑用户的昵称
user_avatar TEXT, -- 被拉黑用户的头像URL
blacklist_reason TEXT, -- 拉黑原因
blacklist_time TEXT, -- 拉黑时间 (YYYY-MM-DD HH:MM:SS 格式文本)
-- 联合唯一约束同一个主播账号不能重复拉黑同一个用户 (通过 user_sec_uid 判断)
UNIQUE (account_url_part, user_sid)
)''')
conn.commit()
conn.close()
all_log_path = os.path.join(run_path, 'all_log.db')
if not os.path.exists(all_log_path):
conn = sqlite3.connect(all_log_path)
c = conn.cursor()
c.execute('''CREATE TABLE all_log
(
id INTEGER PRIMARY KEY AUTOINCREMENT, -- 自增主键
account_url_part TEXT NOT NULL, -- 该日志的主播账号的 url_part (sec_uid)
user_sid TEXT NOT NULL, -- 该条日志用户的抖音主页ID (user_sid)
user_douyin_id TEXT, -- 该条日志黑用户的抖音号 (display_id)
user_nickname TEXT, -- 该条日志用户的昵称
user_avatar TEXT, -- 该条日志用户的头像URL
code INTEGER, -- 该条日志的状态码 0 进入日志 1 互动日志 2 拉黑日志
message TEXT, -- 该条日志的状态信息
log_time TEXT -- 该条日志的时间 (YYYY-MM-DD HH:MM:SS 格式文本)
);
''')
conn.commit()
conn.close()
api = Api()
window_html = os.path.join(run_path, 'index', 'live-management.html')
# window_html = os.path.join(run_path, 'index', 'app.html')
# window_html = os.path.join(run_path, 'index', 'main.html')
# 在 create_window 中设置 api 对象,以便 JS 访问
2026-03-26 08:38:46 +08:00
window = webview.create_window('直播管理系统', window_html, js_api=api, min_size=(1200, 800))
2025-06-03 18:23:22 +08:00
# 启动 webview。启动后界面将显示。
# live-management.html 中的 JS 代码应在 DOM 准备就绪且 pywebview api 可用后调用 get_initial_accounts()
2026-03-26 08:38:46 +08:00
webview.start(debug=False)