liufuhua007

add code

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
# ignore cookie file
tencent_uploader/*.json
youtube_uploader/*.json
douyin_uploader/*.json
bilibili_uploader/*.json
tk_uploader/*.json
cookies
\ No newline at end of file
# xj-marketing
social-auto-upload 该项目旨在自动化发布视频到各个社交媒体平台
xj-marketing 该项目旨在自动化发布视频到各个社交媒体平台
social-auto-upload This project aims to automate the posting of videos to various social media platforms.
xj-marketing This project aims to automate the posting of videos to various social media platforms.
## 💡Feature
......
import argparse
import asyncio
from datetime import datetime
from os.path import exists
from pathlib import Path
from conf import BASE_DIR
from uploader.douyin_uploader.main import douyin_setup, DouYinVideo
from uploader.ks_uploader.main import ks_setup, KSVideo
from uploader.tencent_uploader.main import weixin_setup, TencentVideo
from uploader.tk_uploader.main_chrome import tiktok_setup, TiktokVideo
from uploader.wyh_uploader.main import wyh_setup
from utils.base_social_media import get_supported_social_media, get_cli_action, SOCIAL_MEDIA_DOUYIN, \
SOCIAL_MEDIA_TENCENT, SOCIAL_MEDIA_TIKTOK, SOCIAL_MEDIA_KUAISHOU, SOCIAL_MEDIA_WANGYIHAO
from utils.constant import TencentZoneTypes
from utils.files_times import get_title_and_hashtags
def parse_schedule(schedule_raw):
if schedule_raw:
schedule = datetime.strptime(schedule_raw, '%Y-%m-%d %H:%M')
else:
schedule = None
return schedule
async def main():
# 主解析器
parser = argparse.ArgumentParser(description="Upload video to multiple social-media.")
parser.add_argument("platform", metavar='platform', choices=get_supported_social_media(), help="Choose social-media platform: douyin tencent tiktok kuaishou")
parser.add_argument("account_name", type=str, help="Account name for the platform: xiaoA")
subparsers = parser.add_subparsers(dest="action", metavar='action', help="Choose action", required=True)
actions = get_cli_action()
for action in actions:
action_parser = subparsers.add_parser(action, help=f'{action} operation')
if action == 'login':
# Login 不需要额外参数
continue
elif action == 'upload':
action_parser.add_argument("video_file", help="Path to the Video file")
action_parser.add_argument("-pt", "--publish_type", type=int, choices=[0, 1],
help="0 for immediate, 1 for scheduled", default=0)
action_parser.add_argument('-t', '--schedule', help='Schedule UTC time in %Y-%m-%d %H:%M format')
# 解析命令行参数
args = parser.parse_args()
# 参数校验
if args.action == 'upload':
if not exists(args.video_file):
raise FileNotFoundError(f'Could not find the video file at {args["video_file"]}')
if args.publish_type == 1 and not args.schedule:
parser.error("The schedule must must be specified for scheduled publishing.")
account_file = Path(BASE_DIR / "cookies" / f"{args.platform}_{args.account_name}.json")
account_file.parent.mkdir(exist_ok=True)
print(account_file)
# 根据 action 处理不同的逻辑
if args.action == 'login':
print(f"Logging in with account {args.account_name} on platform {args.platform}")
if args.platform == SOCIAL_MEDIA_DOUYIN:
await douyin_setup(str(account_file), handle=True)
elif args.platform == SOCIAL_MEDIA_TIKTOK:
await tiktok_setup(str(account_file), handle=True)
elif args.platform == SOCIAL_MEDIA_TENCENT:
await weixin_setup(str(account_file), handle=True)
elif args.platform == SOCIAL_MEDIA_KUAISHOU:
await ks_setup(str(account_file), handle=True)
elif args.platform == SOCIAL_MEDIA_WANGYIHAO:
await wyh_setup(str(account_file), handle=True)
elif args.action == 'upload':
title, tags = get_title_and_hashtags(args.video_file)
video_file = args.video_file
if args.publish_type == 0:
print("Uploading immediately...")
publish_date = 0
else:
print("Scheduling videos...")
publish_date = parse_schedule(args.schedule)
if args.platform == SOCIAL_MEDIA_DOUYIN:
await douyin_setup(account_file, handle=False)
app = DouYinVideo(title, video_file, tags, publish_date, account_file)
elif args.platform == SOCIAL_MEDIA_TIKTOK:
await tiktok_setup(account_file, handle=True)
app = TiktokVideo(title, video_file, tags, publish_date, account_file)
elif args.platform == SOCIAL_MEDIA_TENCENT:
await weixin_setup(account_file, handle=True)
category = TencentZoneTypes.LIFESTYLE.value # 标记原创需要否则不需要传
app = TencentVideo(title, video_file, tags, publish_date, account_file, category)
elif args.platform == SOCIAL_MEDIA_KUAISHOU:
await ks_setup(account_file, handle=True)
app = KSVideo(title, video_file, tags, publish_date, account_file)
else:
print("Wrong platform, please check your input")
exit()
await app.main()
if __name__ == "__main__":
asyncio.run(main())
from pathlib import Path
BASE_DIR = Path(__file__).parent.resolve()
XHS_SERVER = "http://127.0.0.1:11901"
LOCAL_CHROME_PATH = "" # change me necessary! for example C:/Program Files/Google/Chrome/Application/chrome.exe

9.44 KB

from pathlib import Path
from conf import BASE_DIR
Path(BASE_DIR / "cookies").mkdir(exist_ok=True)
\ No newline at end of file
from pathlib import Path
from conf import BASE_DIR
Path(BASE_DIR / "cookies" / "bilibili_uploader").mkdir(exist_ok=True)
\ No newline at end of file
This file is too large to display.
import json
import pathlib
import random
from biliup.plugins.bili_webup import BiliBili, Data
from utils.log import bilibili_logger
def extract_keys_from_json(data):
"""Extract specified keys from the provided JSON data."""
keys_to_extract = ["SESSDATA", "bili_jct", "DedeUserID__ckMd5", "DedeUserID", "access_token"]
extracted_data = {}
# Extracting cookie data
for cookie in data['cookie_info']['cookies']:
if cookie['name'] in keys_to_extract:
extracted_data[cookie['name']] = cookie['value']
# Extracting access_token
if "access_token" in data['token_info']:
extracted_data['access_token'] = data['token_info']['access_token']
return extracted_data
def read_cookie_json_file(filepath: pathlib.Path):
with open(filepath, 'r', encoding='utf-8') as file:
content = json.load(file)
return content
def random_emoji():
emoji_list = ["🍏", "🍎", "🍊", "🍋", "🍌", "🍉", "🍇", "🍓", "🍈", "🍒", "🍑", "🍍", "🥭", "🥥", "🥝",
"🍅", "🍆", "🥑", "🥦", "🥒", "🥬", "🌶", "🌽", "🥕", "🥔", "🍠", "🥐", "🍞", "🥖", "🥨", "🥯", "🧀", "🥚", "🍳", "🥞",
"🥓", "🥩", "🍗", "🍖", "🌭", "🍔", "🍟", "🍕", "🥪", "🥙", "🌮", "🌯", "🥗", "🥘", "🥫", "🍝", "🍜", "🍲", "🍛", "🍣",
"🍱", "🥟", "🍤", "🍙", "🍚", "🍘", "🍥", "🥮", "🥠", "🍢", "🍡", "🍧", "🍨", "🍦", "🥧", "🍰", "🎂", "🍮", "🍭", "🍬",
"🍫", "🍿", "🧂", "🍩", "🍪", "🌰", "🥜", "🍯", "🥛", "🍼", "☕️", "🍵", "🥤", "🍶", "🍻", "🥂", "🍷", "🥃", "🍸", "🍹",
"🍾", "🥄", "🍴", "🍽", "🥣", "🥡", "🥢"]
return random.choice(emoji_list)
class BilibiliUploader(object):
def __init__(self, cookie_data, file: pathlib.Path, title, desc, tid, tags, dtime):
self.upload_thread_num = 3
self.copyright = 1
self.lines = 'AUTO'
self.cookie_data = cookie_data
self.file = file
self.title = title
self.desc = desc
self.tid = tid
self.tags = tags
self.dtime = dtime
self._init_data()
def _init_data(self):
self.data = Data()
self.data.copyright = self.copyright
self.data.title = self.title
self.data.desc = self.desc
self.data.tid = self.tid
self.data.set_tag(self.tags)
self.data.dtime = self.dtime
def upload(self):
with BiliBili(self.data) as bili:
bili.login_by_cookies(self.cookie_data)
bili.access_token = self.cookie_data.get('access_token')
video_part = bili.upload_file(str(self.file), lines=self.lines,
tasks=self.upload_thread_num) # 上传视频,默认线路AUTO自动选择,线程数量3。
video_part['title'] = self.title
self.data.append(video_part)
ret = bili.submit() # 提交视频
if ret.get('code') == 0:
bilibili_logger.success(f'[+] {self.file.name}上传 成功')
return True
else:
bilibili_logger.error(f'[-] {self.file.name}上传 失败, error messge: {ret.get("message")}')
return False
from pathlib import Path
from conf import BASE_DIR
Path(BASE_DIR / "cookies" / "douyin_uploader").mkdir(exist_ok=True)
\ No newline at end of file
# -*- coding: utf-8 -*-
from datetime import datetime
from playwright.async_api import Playwright, async_playwright, Page
import os
import asyncio
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.log import douyin_logger
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://creator.douyin.com/creator-micro/content/upload")
try:
await page.wait_for_url("https://creator.douyin.com/creator-micro/content/upload", timeout=5000)
except:
print("[+] 等待5秒 cookie 失效")
await context.close()
await browser.close()
return False
# 2024.06.17 抖音创作者中心改版
if await page.get_by_text('手机号登录').count():
print("[+] 等待5秒 cookie 失效")
return False
else:
print("[+] cookie 有效")
return True
async def douyin_setup(account_file, handle=False):
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
# Todo alert message
return False
douyin_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
await douyin_cookie_gen(account_file)
return True
async def douyin_cookie_gen(account_file):
async with async_playwright() as playwright:
options = {
'headless': False
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://creator.douyin.com/")
await page.pause()
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
class DouYinVideo(object):
def __init__(self, title, file_path, tags, publish_date: datetime, account_file, thumbnail_path=None):
self.title = title # 视频标题
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.date_format = '%Y年%m月%d日 %H:%M'
self.local_executable_path = LOCAL_CHROME_PATH
self.thumbnail_path = thumbnail_path
async def set_schedule_time_douyin(self, page, publish_date):
# 选择包含特定文本内容的 label 元素
label_element = page.locator("[class^='radio']:has-text('定时发布')")
# 在选中的 label 元素下点击 checkbox
await label_element.click()
await asyncio.sleep(1)
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
await asyncio.sleep(1)
await page.locator('.semi-input[placeholder="日期和时间"]').click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date_hour))
await page.keyboard.press("Enter")
await asyncio.sleep(1)
async def handle_upload_error(self, page):
douyin_logger.info('视频出错了,重新上传中')
await page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
# 使用 Chromium 浏览器启动一个浏览器实例
if self.local_executable_path:
browser = await playwright.chromium.launch(headless=False, executable_path=self.local_executable_path)
else:
browser = await playwright.chromium.launch(headless=False)
# 创建一个浏览器上下文,使用指定的 cookie 文件
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://creator.douyin.com/creator-micro/content/upload")
douyin_logger.info(f'[+]正在上传-------{self.title}.mp4')
# 等待页面跳转到指定的 URL,没进入,则自动等待到超时
douyin_logger.info(f'[-] 正在打开主页...')
await page.wait_for_url("https://creator.douyin.com/creator-micro/content/upload")
# 点击 "上传视频" 按钮
await page.locator("div[class^='container'] input").set_input_files(self.file_path)
# 等待页面跳转到指定的 URL
while True:
# 判断是是否进入视频发布页面,没进入,则自动等待到超时
try:
await page.wait_for_url(
"https://creator.douyin.com/creator-micro/content/publish?enter_from=publish_page")
break
except:
douyin_logger.info(f' [-] 正在等待进入视频发布页面...')
await asyncio.sleep(0.1)
# 填充标题和话题
# 检查是否存在包含输入框的元素
# 这里为了避免页面变化,故使用相对位置定位:作品标题父级右侧第一个元素的input子元素
await asyncio.sleep(1)
douyin_logger.info(f' [-] 正在填充标题和话题...')
title_container = page.get_by_text('作品标题').locator("..").locator("xpath=following-sibling::div[1]").locator("input")
if await title_container.count():
await title_container.fill(self.title[:30])
else:
titlecontainer = page.locator(".notranslate")
await titlecontainer.click()
await page.keyboard.press("Backspace")
await page.keyboard.press("Control+KeyA")
await page.keyboard.press("Delete")
await page.keyboard.type(self.title)
await page.keyboard.press("Enter")
css_selector = ".zone-container"
for index, tag in enumerate(self.tags, start=1):
await page.type(css_selector, "#" + tag)
await page.press(css_selector, "Space")
douyin_logger.info(f'总共添加{len(self.tags)}个话题')
while True:
# 判断重新上传按钮是否存在,如果不存在,代表视频正在上传,则等待
try:
# 新版:定位重新上传
number = await page.locator('[class^="long-card"] div:has-text("重新上传")').count()
if number > 0:
douyin_logger.success(" [-]视频上传完毕")
break
else:
douyin_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
if await page.locator('div.progress-div > div:has-text("上传失败")').count():
douyin_logger.error(" [-] 发现上传出错了... 准备重试")
await self.handle_upload_error(page)
except:
douyin_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
#上传视频封面
await self.set_thumbnail(page, self.thumbnail_path)
# 更换可见元素
await self.set_location(page, "杭州市")
# 頭條/西瓜
third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch'
# 定位是否有第三方平台
if await page.locator(third_part_element).count():
# 检测是否是已选中状态
if 'semi-switch-checked' not in await page.eval_on_selector(third_part_element, 'div => div.className'):
await page.locator(third_part_element).locator('input.semi-switch-native-control').click()
if self.publish_date != 0:
await self.set_schedule_time_douyin(page, self.publish_date)
# 判断视频是否发布成功
while True:
# 判断视频是否发布成功
try:
publish_button = page.get_by_role('button', name="发布", exact=True)
if await publish_button.count():
await publish_button.click()
await page.wait_for_url("https://creator.douyin.com/creator-micro/content/manage**",
timeout=3000) # 如果自动跳转到作品页面,则代表发布成功
douyin_logger.success(" [-]视频发布成功")
break
except:
douyin_logger.info(" [-] 视频正在发布中...")
await page.screenshot(full_page=True)
await asyncio.sleep(0.5)
await context.storage_state(path=self.account_file) # 保存cookie
douyin_logger.success(' [-]cookie更新完毕!')
await asyncio.sleep(2) # 这里延迟是为了方便眼睛直观的观看
# 关闭浏览器上下文和浏览器实例
await context.close()
await browser.close()
async def set_thumbnail(self, page: Page, thumbnail_path: str):
if thumbnail_path:
await page.click('text="选择封面"')
await page.wait_for_selector("div.semi-modal-content:visible")
await page.click('text="设置竖封面"')
await page.wait_for_timeout(2000) # 等待2秒
# 定位到上传区域并点击
await page.locator("div[class^='semi-upload upload'] >> input.semi-upload-hidden-input").set_input_files(thumbnail_path)
await page.wait_for_timeout(2000) # 等待2秒
await page.locator("div[class^='extractFooter'] button:visible:has-text('完成')").click()
# finish_confirm_element = page.locator("div[class^='confirmBtn'] >> div:has-text('完成')")
# if await finish_confirm_element.count():
# await finish_confirm_element.click()
# await page.locator("div[class^='footer'] button:has-text('完成')").click()
async def set_location(self, page: Page, location: str = "杭州市"):
# todo supoort location later
# await page.get_by_text('添加标签').locator("..").locator("..").locator("xpath=following-sibling::div").locator(
# "div.semi-select-single").nth(0).click()
await page.locator('div.semi-select span:has-text("输入地理位置")').click()
await page.keyboard.press("Backspace")
await page.wait_for_timeout(2000)
await page.keyboard.type(location)
await page.wait_for_selector('div[role="listbox"] [role="option"]', timeout=5000)
await page.locator('div[role="listbox"] [role="option"]').first.click()
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)
from pathlib import Path
from conf import BASE_DIR
Path(BASE_DIR / "cookies" / "ks_uploader").mkdir(exist_ok=True)
\ No newline at end of file
# -*- coding: utf-8 -*-
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import kuaishou_logger
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://cp.kuaishou.com/article/publish/video")
try:
await page.wait_for_selector("div.names div.container div.name:text('机构服务')", timeout=5000) # 等待5秒
kuaishou_logger.info("[+] 等待5秒 cookie 失效")
return False
except:
kuaishou_logger.success("[+] cookie 有效")
return True
async def ks_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "ks_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
kuaishou_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
await get_ks_cookie(account_file)
return True
async def get_ks_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://cp.kuaishou.com")
await page.pause()
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
class KSVideo(object):
def __init__(self, title, file_path, tags, publish_date: datetime, account_file):
self.title = title # 视频标题
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.date_format = '%Y-%m-%d %H:%M'
self.local_executable_path = LOCAL_CHROME_PATH
async def handle_upload_error(self, page):
kuaishou_logger.error("视频出错了,重新上传中")
await page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
# 使用 Chromium 浏览器启动一个浏览器实例
print(self.local_executable_path)
if self.local_executable_path:
browser = await playwright.chromium.launch(
headless=False,
executable_path=self.local_executable_path,
)
else:
browser = await playwright.chromium.launch(
headless=False
) # 创建一个浏览器上下文,使用指定的 cookie 文件
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
context.on("close", lambda: context.storage_state(path=self.account_file))
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://cp.kuaishou.com/article/publish/video")
kuaishou_logger.info('正在上传-------{}.mp4'.format(self.title))
# 等待页面跳转到指定的 URL,没进入,则自动等待到超时
kuaishou_logger.info('正在打开主页...')
await page.wait_for_url("https://cp.kuaishou.com/article/publish/video")
# 点击 "上传视频" 按钮
upload_button = page.locator("button[class^='_upload-btn']")
await upload_button.wait_for(state='visible') # 确保按钮可见
async with page.expect_file_chooser() as fc_info:
await upload_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
await asyncio.sleep(2)
# if not await page.get_by_text("封面编辑").count():
# raise Exception("似乎没有跳转到到编辑页面")
await asyncio.sleep(1)
# 等待按钮可交互
new_feature_button = page.locator('button[type="button"] span:text("我知道了")')
if await new_feature_button.count() > 0:
await new_feature_button.click()
kuaishou_logger.info("正在填充标题和话题...")
await page.get_by_text("描述").locator("xpath=following-sibling::div").click()
kuaishou_logger.info("clear existing title")
await page.keyboard.press("Backspace")
await page.keyboard.press("Control+KeyA")
await page.keyboard.press("Delete")
kuaishou_logger.info("filling new title")
await page.keyboard.type(self.title)
await page.keyboard.press("Enter")
# 快手只能添加3个话题
for index, tag in enumerate(self.tags[:3], start=1):
kuaishou_logger.info("正在添加第%s个话题" % index)
await page.keyboard.type(f"#{tag} ")
await asyncio.sleep(2)
max_retries = 60 # 设置最大重试次数,最大等待时间为 2 分钟
retry_count = 0
while retry_count < max_retries:
try:
# 获取包含 '上传中' 文本的元素数量
number = await page.locator("text=上传中").count()
if number == 0:
kuaishou_logger.success("视频上传完毕")
break
else:
if retry_count % 5 == 0:
kuaishou_logger.info("正在上传视频中...")
await asyncio.sleep(2)
except Exception as e:
kuaishou_logger.error(f"检查上传状态时发生错误: {e}")
await asyncio.sleep(2) # 等待 2 秒后重试
retry_count += 1
if retry_count == max_retries:
kuaishou_logger.warning("超过最大重试次数,视频上传可能未完成。")
# 定时任务
if self.publish_date != 0:
await self.set_schedule_time(page, self.publish_date)
# 判断视频是否发布成功
while True:
try:
publish_button = page.get_by_text("发布", exact=True)
if await publish_button.count() > 0:
await publish_button.click()
await asyncio.sleep(1)
confirm_button = page.get_by_text("确认发布")
if await confirm_button.count() > 0:
await confirm_button.click()
# 等待页面跳转,确认发布成功
await page.wait_for_url(
"https://cp.kuaishou.com/article/manage/video?status=2&from=publish",
timeout=5000,
)
kuaishou_logger.success("视频发布成功")
break
except Exception as e:
kuaishou_logger.info(f"视频正在发布中... 错误: {e}")
await page.screenshot(full_page=True)
await asyncio.sleep(1)
await context.storage_state(path=self.account_file) # 保存cookie
kuaishou_logger.info('cookie更新完毕!')
await asyncio.sleep(2) # 这里延迟是为了方便眼睛直观的观看
# 关闭浏览器上下文和浏览器实例
await context.close()
await browser.close()
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)
async def set_schedule_time(self, page, publish_date):
kuaishou_logger.info("click schedule")
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M:%S")
await page.locator("label:text('发布时间')").locator('xpath=following-sibling::div').locator(
'.ant-radio-input').nth(1).click()
await asyncio.sleep(1)
await page.locator('div.ant-picker-input input[placeholder="选择日期时间"]').click()
await asyncio.sleep(1)
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date_hour))
await page.keyboard.press("Enter")
await asyncio.sleep(1)
from pathlib import Path
from conf import BASE_DIR
Path(BASE_DIR / "cookies" / "tencent_uploader").mkdir(exist_ok=True)
\ No newline at end of file
# -*- coding: utf-8 -*-
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import tencent_logger
def format_str_for_short_title(origin_title: str) -> str:
# 定义允许的特殊字符
allowed_special_chars = "《》“”:+?%°"
# 移除不允许的特殊字符
filtered_chars = [char if char.isalnum() or char in allowed_special_chars else ' ' if char == ',' else '' for
char in origin_title]
formatted_string = ''.join(filtered_chars)
# 调整字符串长度
if len(formatted_string) > 16:
# 截断字符串
formatted_string = formatted_string[:16]
elif len(formatted_string) < 6:
# 使用空格来填充字符串
formatted_string += ' ' * (6 - len(formatted_string))
return formatted_string
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://channels.weixin.qq.com/platform/post/create")
try:
await page.wait_for_selector('div.title-name:has-text("微信小店")', timeout=5000) # 等待5秒
tencent_logger.error("[+] 等待5秒 cookie 失效")
return False
except:
tencent_logger.success("[+] cookie 有效")
return True
async def get_tencent_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
# Pause the page, and start recording manually.
context = await set_init_script(context)
page = await context.new_page()
await page.goto("https://channels.weixin.qq.com")
await page.pause()
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
async def weixin_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "tencent_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
# Todo alert message
return False
tencent_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
await get_tencent_cookie(account_file)
return True
class TencentVideo(object):
def __init__(self, title, file_path, tags, publish_date: datetime, account_file, category=None):
self.title = title # 视频标题
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.category = category
self.local_executable_path = LOCAL_CHROME_PATH
async def set_schedule_time_tencent(self, page, publish_date):
label_element = page.locator("label").filter(has_text="定时").nth(1)
await label_element.click()
await page.click('input[placeholder="请选择发表时间"]')
str_month = str(publish_date.month) if publish_date.month > 9 else "0" + str(publish_date.month)
current_month = str_month + "月"
# 获取当前的月份
page_month = await page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")')
# 检查当前月份是否与目标月份相同
if page_month != current_month:
await page.click('button.weui-desktop-btn__icon__right')
# 获取页面元素
elements = await page.query_selector_all('table.weui-desktop-picker__table a')
# 遍历元素并点击匹配的元素
for element in elements:
if 'weui-desktop-picker__disabled' in await element.evaluate('el => el.className'):
continue
text = await element.inner_text()
if text.strip() == str(publish_date.day):
await element.click()
break
# 输入小时部分(假设选择11小时)
await page.click('input[placeholder="请选择时间"]')
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date.hour))
# 选择标题栏(令定时时间生效)
await page.locator("div.input-editor").click()
async def handle_upload_error(self, page):
tencent_logger.info("视频出错了,重新上传中")
await page.locator('div.media-status-content div.tag-inner:has-text("删除")').click()
await page.get_by_role('button', name="删除", exact=True).click()
file_input = page.locator('input[type="file"]')
await file_input.set_input_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
# 使用 Chromium (这里使用系统内浏览器,用chromium 会造成h264错误
browser = await playwright.chromium.launch(headless=False, executable_path=self.local_executable_path)
# 创建一个浏览器上下文,使用指定的 cookie 文件
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://channels.weixin.qq.com/platform/post/create")
tencent_logger.info(f'[+]正在上传-------{self.title}.mp4')
# 等待页面跳转到指定的 URL,没进入,则自动等待到超时
await page.wait_for_url("https://channels.weixin.qq.com/platform/post/create")
# await page.wait_for_selector('input[type="file"]', timeout=10000)
file_input = page.locator('input[type="file"]')
await file_input.set_input_files(self.file_path)
# 填充标题和话题
await self.add_title_tags(page)
# 添加商品
# await self.add_product(page)
# 合集功能
await self.add_collection(page)
# 原创选择
await self.add_original(page)
# 检测上传状态
await self.detect_upload_status(page)
if self.publish_date != 0:
await self.set_schedule_time_tencent(page, self.publish_date)
# 添加短标题
await self.add_short_title(page)
await self.click_publish(page)
await context.storage_state(path=f"{self.account_file}") # 保存cookie
tencent_logger.success(' [-]cookie更新完毕!')
await asyncio.sleep(2) # 这里延迟是为了方便眼睛直观的观看
# 关闭浏览器上下文和浏览器实例
await context.close()
await browser.close()
async def add_short_title(self, page):
short_title_element = page.get_by_text("短标题", exact=True).locator("..").locator(
"xpath=following-sibling::div").locator(
'span input[type="text"]')
if await short_title_element.count():
short_title = format_str_for_short_title(self.title)
await short_title_element.fill(short_title)
async def click_publish(self, page):
while True:
try:
publish_buttion = page.locator('div.form-btns button:has-text("发表")')
if await publish_buttion.count():
await publish_buttion.click()
await page.wait_for_url("https://channels.weixin.qq.com/platform/post/list", timeout=1500)
tencent_logger.success(" [-]视频发布成功")
break
except Exception as e:
current_url = page.url
if "https://channels.weixin.qq.com/platform/post/list" in current_url:
tencent_logger.success(" [-]视频发布成功")
break
else:
tencent_logger.exception(f" [-] Exception: {e}")
tencent_logger.info(" [-] 视频正在发布中...")
await asyncio.sleep(0.5)
async def detect_upload_status(self, page):
while True:
# 匹配删除按钮,代表视频上传完毕,如果不存在,代表视频正在上传,则等待
try:
# 匹配删除按钮,代表视频上传完毕
if "weui-desktop-btn_disabled" not in await page.get_by_role("button", name="发表").get_attribute(
'class'):
tencent_logger.info(" [-]视频上传完毕")
break
else:
tencent_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
# 出错了视频出错
if await page.locator('div.status-msg.error').count() and await page.locator(
'div.media-status-content div.tag-inner:has-text("删除")').count():
tencent_logger.error(" [-] 发现上传出错了...准备重试")
await self.handle_upload_error(page)
except:
tencent_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
async def add_title_tags(self, page):
await page.locator("div.input-editor").click()
await page.keyboard.type(self.title)
await page.keyboard.press("Enter")
for index, tag in enumerate(self.tags, start=1):
await page.keyboard.type("#" + tag)
await page.keyboard.press("Space")
tencent_logger.info(f"成功添加hashtag: {len(self.tags)}")
async def add_collection(self, page):
collection_elements = page.get_by_text("添加到合集").locator("xpath=following-sibling::div").locator(
'.option-list-wrap > div')
if await collection_elements.count() > 1:
await page.get_by_text("添加到合集").locator("xpath=following-sibling::div").click()
await collection_elements.first.click()
async def add_original(self, page):
if await page.get_by_label("视频为原创").count():
await page.get_by_label("视频为原创").check()
# 检查 "我已阅读并同意 《视频号原创声明使用条款》" 元素是否存在
label_locator = await page.locator('label:has-text("我已阅读并同意 《视频号原创声明使用条款》")').is_visible()
if label_locator:
await page.get_by_label("我已阅读并同意 《视频号原创声明使用条款》").check()
await page.get_by_role("button", name="声明原创").click()
# 2023年11月20日 wechat更新: 可能新账号或者改版账号,出现新的选择页面
if await page.locator('div.label span:has-text("声明原创")').count() and self.category:
# 因处罚无法勾选原创,故先判断是否可用
if not await page.locator('div.declare-original-checkbox input.ant-checkbox-input').is_disabled():
await page.locator('div.declare-original-checkbox input.ant-checkbox-input').click()
if not await page.locator(
'div.declare-original-dialog label.ant-checkbox-wrapper.ant-checkbox-wrapper-checked:visible').count():
await page.locator('div.declare-original-dialog input.ant-checkbox-input:visible').click()
if await page.locator('div.original-type-form > div.form-label:has-text("原创类型"):visible').count():
await page.locator('div.form-content:visible').click() # 下拉菜单
await page.locator(
f'div.form-content:visible ul.weui-desktop-dropdown__list li.weui-desktop-dropdown__list-ele:has-text("{self.category}")').first.click()
await page.wait_for_timeout(1000)
if await page.locator('button:has-text("声明原创"):visible').count():
await page.locator('button:has-text("声明原创"):visible').click()
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)
from pathlib import Path
from conf import BASE_DIR
Path(BASE_DIR / "cookies" / "tk_uploader").mkdir(exist_ok=True)
\ No newline at end of file
# -*- coding: utf-8 -*-
import re
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
from uploader.tk_uploader.tk_config import Tk_Locator
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import tiktok_logger
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.firefox.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://www.tiktok.com/tiktokstudio/upload?lang=en")
await page.wait_for_load_state('networkidle')
try:
# 选择所有的 select 元素
select_elements = await page.query_selector_all('select')
for element in select_elements:
class_name = await element.get_attribute('class')
# 使用正则表达式匹配特定模式的 class 名称
if re.match(r'tiktok-.*-SelectFormContainer.*', class_name):
tiktok_logger.error("[+] cookie expired")
return False
tiktok_logger.success("[+] cookie valid")
return True
except:
tiktok_logger.success("[+] cookie valid")
return True
async def tiktok_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "tk_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
tiktok_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
await get_tiktok_cookie(account_file)
return True
async def get_tiktok_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB',
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.firefox.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://www.tiktok.com/login?lang=en")
await page.pause()
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
class TiktokVideo(object):
def __init__(self, title, file_path, tags, publish_date, account_file):
self.title = title
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.locator_base = None
async def set_schedule_time(self, page, publish_date):
schedule_input_element = self.locator_base.get_by_label('Schedule')
await schedule_input_element.wait_for(state='visible') # 确保按钮可见
await schedule_input_element.click()
scheduled_picker = self.locator_base.locator('div.scheduled-picker')
await scheduled_picker.locator('div.TUXInputBox').nth(1).click()
calendar_month = await self.locator_base.locator('div.calendar-wrapper span.month-title').inner_text()
n_calendar_month = datetime.strptime(calendar_month, '%B').month
schedule_month = publish_date.month
if n_calendar_month != schedule_month:
if n_calendar_month < schedule_month:
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(-1)
else:
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(0)
await arrow.click()
# day set
valid_days_locator = self.locator_base.locator(
'div.calendar-wrapper span.day.valid')
valid_days = await valid_days_locator.count()
for i in range(valid_days):
day_element = valid_days_locator.nth(i)
text = await day_element.inner_text()
if text.strip() == str(publish_date.day):
await day_element.click()
break
# time set
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
hour_str = publish_date.strftime("%H")
correct_minute = int(publish_date.minute / 5)
minute_str = f"{correct_minute:02d}"
hour_selector = f"span.tiktok-timepicker-left:has-text('{hour_str}')"
minute_selector = f"span.tiktok-timepicker-right:has-text('{minute_str}')"
# pick hour first
await self.locator_base.locator(hour_selector).click()
# click time button again
# 等待某个特定的元素出现或状态变化,表明UI已更新
await page.wait_for_timeout(1000) # 等待500毫秒
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
# pick minutes after
await self.locator_base.locator(minute_selector).click()
# click title to remove the focus.
await self.locator_base.locator("h1:has-text('Upload video')").click()
async def handle_upload_error(self, page):
tiktok_logger.info("video upload error retrying.")
select_file_button = self.locator_base.locator('button[aria-label="Select file"]')
async with page.expect_file_chooser() as fc_info:
await select_file_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
browser = await playwright.firefox.launch(headless=False)
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
page = await context.new_page()
await page.goto("https://www.tiktok.com/creator-center/upload")
tiktok_logger.info(f'[+]Uploading-------{self.title}.mp4')
await page.wait_for_url("https://www.tiktok.com/tiktokstudio/upload", timeout=10000)
try:
await page.wait_for_selector('iframe[data-tt="Upload_index_iframe"], div.upload-container', timeout=10000)
tiktok_logger.info("Either iframe or div appeared.")
except Exception as e:
tiktok_logger.error("Neither iframe nor div appeared within the timeout.")
await self.choose_base_locator(page)
upload_button = self.locator_base.locator(
'button:has-text("Select video"):visible')
await upload_button.wait_for(state='visible') # 确保按钮可见
async with page.expect_file_chooser() as fc_info:
await upload_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
await self.add_title_tags(page)
# detact upload status
await self.detect_upload_status(page)
if self.publish_date != 0:
await self.set_schedule_time(page, self.publish_date)
await self.click_publish(page)
await context.storage_state(path=f"{self.account_file}") # save cookie
tiktok_logger.info(' [-] update cookie!')
await asyncio.sleep(2) # close delay for look the video status
# close all
await context.close()
await browser.close()
async def add_title_tags(self, page):
editor_locator = self.locator_base.locator('div.public-DraftEditor-content')
await editor_locator.click()
await page.keyboard.press("End")
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await page.keyboard.press("End")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.insert_text(self.title)
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.press("End")
await page.keyboard.press("Enter")
# tag part
for index, tag in enumerate(self.tags, start=1):
tiktok_logger.info("Setting the %s tag" % index)
await page.keyboard.press("End")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.insert_text("#" + tag + " ")
await page.keyboard.press("Space")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.press("Backspace")
await page.keyboard.press("End")
async def click_publish(self, page):
success_flag_div = '#\\:r9\\:'
while True:
try:
publish_button = self.locator_base.locator('div.btn-post')
if await publish_button.count():
await publish_button.click()
await self.locator_base.locator(success_flag_div).wait_for(state="visible", timeout=3000)
tiktok_logger.success(" [-] video published success")
break
except Exception as e:
if await self.locator_base.locator(success_flag_div).count():
tiktok_logger.success(" [-]video published success")
break
else:
tiktok_logger.exception(f" [-] Exception: {e}")
tiktok_logger.info(" [-] video publishing")
await page.screenshot(full_page=True)
await asyncio.sleep(0.5)
async def detect_upload_status(self, page):
while True:
try:
if await self.locator_base.locator('div.btn-post > button').get_attribute("disabled") is None:
tiktok_logger.info(" [-]video uploaded.")
break
else:
tiktok_logger.info(" [-] video uploading...")
await asyncio.sleep(2)
if await self.locator_base.locator('button[aria-label="Select file"]').count():
tiktok_logger.info(" [-] found some error while uploading now retry...")
await self.handle_upload_error(page)
except:
tiktok_logger.info(" [-] video uploading...")
await asyncio.sleep(2)
async def choose_base_locator(self, page):
# await page.wait_for_selector('div.upload-container')
if await page.locator('iframe[data-tt="Upload_index_iframe"]').count():
self.locator_base = self.locator_base
else:
self.locator_base = page.locator(Tk_Locator.default)
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)
# -*- coding: utf-8 -*-
import re
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
from conf import LOCAL_CHROME_PATH
from uploader.tk_uploader.tk_config import Tk_Locator
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import tiktok_logger
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://www.tiktok.com/tiktokstudio/upload?lang=en")
await page.wait_for_load_state('networkidle')
try:
# 选择所有的 select 元素
select_elements = await page.query_selector_all('select')
for element in select_elements:
class_name = await element.get_attribute('class')
# 使用正则表达式匹配特定模式的 class 名称
if re.match(r'tiktok-.*-SelectFormContainer.*', class_name):
tiktok_logger.error("[+] cookie expired")
return False
tiktok_logger.success("[+] cookie valid")
return True
except:
tiktok_logger.success("[+] cookie valid")
return True
async def tiktok_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "tk_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
tiktok_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
await get_tiktok_cookie(account_file)
return True
async def get_tiktok_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB',
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://www.tiktok.com/login?lang=en")
await page.pause()
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
class TiktokVideo(object):
def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None):
self.title = title
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.thumbnail_path = thumbnail_path
self.account_file = account_file
self.local_executable_path = LOCAL_CHROME_PATH
self.locator_base = None
async def set_schedule_time(self, page, publish_date):
schedule_input_element = self.locator_base.get_by_label('Schedule')
await schedule_input_element.wait_for(state='visible') # 确保按钮可见
await schedule_input_element.click()
if await self.locator_base.locator('div.TUXButton-content >> text=Allow').count():
await self.locator_base.locator('div.TUXButton-content >> text=Allow').click()
scheduled_picker = self.locator_base.locator('div.scheduled-picker')
await scheduled_picker.locator('div.TUXInputBox').nth(1).click()
calendar_month = await self.locator_base.locator(
'div.calendar-wrapper span.month-title').inner_text()
n_calendar_month = datetime.strptime(calendar_month, '%B').month
schedule_month = publish_date.month
if n_calendar_month != schedule_month:
if n_calendar_month < schedule_month:
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(-1)
else:
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(0)
await arrow.click()
# day set
valid_days_locator = self.locator_base.locator(
'div.calendar-wrapper span.day.valid')
valid_days = await valid_days_locator.count()
for i in range(valid_days):
day_element = valid_days_locator.nth(i)
text = await day_element.inner_text()
if text.strip() == str(publish_date.day):
await day_element.click()
break
# time set
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
hour_str = publish_date.strftime("%H")
correct_minute = int(publish_date.minute / 5)
minute_str = f"{correct_minute:02d}"
hour_selector = f"span.tiktok-timepicker-left:has-text('{hour_str}')"
minute_selector = f"span.tiktok-timepicker-right:has-text('{minute_str}')"
# pick hour first
await page.wait_for_timeout(500) # 等待500毫秒
await self.locator_base.locator(hour_selector).click()
# click time button again
await page.wait_for_timeout(500) # 等待500毫秒
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
await page.wait_for_timeout(500) # 等待500毫秒
# pick minutes after
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
await page.wait_for_timeout(500) # 等待500毫秒
await self.locator_base.locator(minute_selector).click()
# click title to remove the focus.
# await self.locator_base.locator("h1:has-text('Upload video')").click()
async def handle_upload_error(self, page):
tiktok_logger.info("video upload error retrying.")
select_file_button = self.locator_base.locator('button[aria-label="Select file"]')
async with page.expect_file_chooser() as fc_info:
await select_file_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
browser = await playwright.chromium.launch(headless=False, executable_path=self.local_executable_path)
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
page = await context.new_page()
# change language to eng first
await self.change_language(page)
await page.goto("https://www.tiktok.com/tiktokstudio/upload")
tiktok_logger.info(f'[+]Uploading-------{self.title}.mp4')
await page.wait_for_url("https://www.tiktok.com/tiktokstudio/upload", timeout=10000)
try:
await page.wait_for_selector('iframe[data-tt="Upload_index_iframe"], div.upload-container', timeout=10000)
tiktok_logger.info("Either iframe or div appeared.")
except Exception as e:
tiktok_logger.error("Neither iframe nor div appeared within the timeout.")
await self.choose_base_locator(page)
upload_button = self.locator_base.locator(
'button:has-text("Select video"):visible')
await upload_button.wait_for(state='visible') # 确保按钮可见
async with page.expect_file_chooser() as fc_info:
await upload_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
await self.add_title_tags(page)
# detect upload status
await self.detect_upload_status(page)
if self.thumbnail_path:
tiktok_logger.info(f'[+] Uploading thumbnail file {self.title}.png')
await self.upload_thumbnails(page)
if self.publish_date != 0:
await self.set_schedule_time(page, self.publish_date)
await self.click_publish(page)
await context.storage_state(path=f"{self.account_file}") # save cookie
tiktok_logger.info(' [-] update cookie!')
await asyncio.sleep(2) # close delay for look the video status
# close all
await context.close()
await browser.close()
async def add_title_tags(self, page):
editor_locator = self.locator_base.locator('div.public-DraftEditor-content')
await editor_locator.click()
await page.keyboard.press("End")
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await page.keyboard.press("End")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.insert_text(self.title)
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.press("End")
await page.keyboard.press("Enter")
# tag part
for index, tag in enumerate(self.tags, start=1):
tiktok_logger.info("Setting the %s tag" % index)
await page.keyboard.press("End")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.insert_text("#" + tag + " ")
await page.keyboard.press("Space")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.press("Backspace")
await page.keyboard.press("End")
async def upload_thumbnails(self, page):
await self.locator_base.locator(".cover-container").click()
await self.locator_base.locator(".cover-edit-container >> text=Upload cover").click()
async with page.expect_file_chooser() as fc_info:
await self.locator_base.locator(".upload-image-upload-area").click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.thumbnail_path)
await self.locator_base.locator('div.cover-edit-panel:not(.hide-panel)').get_by_role(
"button", name="Confirm").click()
await page.wait_for_timeout(3000) # wait 3s, fix it later
async def change_language(self, page):
# set the language to english
await page.goto("https://www.tiktok.com")
await page.wait_for_url("https://www.tiktok.com/", timeout=100000)
await page.wait_for_selector('#header-more-menu-icon')
await page.locator('#header-more-menu-icon').hover()
await page.locator('[data-e2e="language-select"]').click()
await page.locator('#lang-setting-popup-list >> text=English').click()
async def click_publish(self, page):
success_flag_div = 'div.common-modal-confirm-modal'
while True:
try:
publish_button = self.locator_base.locator('div.button-group button').nth(0)
if await publish_button.count():
await publish_button.click()
await self.locator_base.locator(success_flag_div).wait_for(state="visible", timeout=3000)
tiktok_logger.success(" [-] video published success")
break
except Exception as e:
if await self.locator_base.locator(success_flag_div).count():
tiktok_logger.success(" [-]video published success")
break
else:
tiktok_logger.exception(f" [-] Exception: {e}")
tiktok_logger.info(" [-] video publishing")
await page.screenshot(full_page=True)
await asyncio.sleep(0.5)
async def detect_upload_status(self, page):
while True:
try:
# if await self.locator_base.locator('div.btn-post > button').get_attribute("disabled") is None:
if await self.locator_base.locator(
'div.button-group > button >> text=Post').get_attribute("disabled") is None:
tiktok_logger.info(" [-]video uploaded.")
break
else:
tiktok_logger.info(" [-] video uploading...")
await asyncio.sleep(2)
if await self.locator_base.locator(
'button[aria-label="Select file"]').count():
tiktok_logger.info(" [-] found some error while uploading now retry...")
await self.handle_upload_error(page)
except:
tiktok_logger.info(" [-] video uploading...")
await asyncio.sleep(2)
async def choose_base_locator(self, page):
# await page.wait_for_selector('div.upload-container')
if await page.locator('iframe[data-tt="Upload_index_iframe"]').count():
self.locator_base = page.frame_locator(Tk_Locator.tk_iframe)
else:
self.locator_base = page.locator(Tk_Locator.default)
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)
class Tk_Locator(object):
tk_iframe = '[data-tt="Upload_index_iframe"]'
default = 'body'
import os
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import kuaishou_logger
from playwright.async_api import Playwright, async_playwright
async def wyh_setup(account_file, handle=False):
print(account_file)
account_file = get_absolute_path(account_file, "wyh_uploader")
if not os.path.exists(account_file):
if not handle:
return False
kuaishou_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
await get_wyh_cookie(account_file)
else:
await open_wyh_main_page(account_file)
return True
async def open_wyh_main_page(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context(storage_state=account_file) # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://mp.163.com")
board = page.locator("//*[@class='homeV4__board__card__data__value']")
# html_content = await board.inner_html()
# print(html_content)
content = await board.nth(0).inner_text() #总粉丝数
print(content)
content = await board.nth(1).inner_text() #总阅读数
print(content)
content = await board.nth(2).inner_text() #总收益
print(content)
await page.get_by_text("粉丝数据").click()
await page.wait_for_timeout(5000)
await page.get_by_text("内容数据").click()
await page.wait_for_timeout(5000)
await page.get_by_text("收益数据").click()
await page.wait_for_timeout(5000)
browser.close()
async def get_wyh_cookie(account_file):
print("get_wyh_cookie")
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://mp.163.com")
# await page.pause()
# 点击调试器的继续,保存cookie
# await context.storage_state(path=account_file)
# 自动登陆
await page.wait_for_timeout(20000)
frame = page.frame_locator('//iframe[contains(@id, "x-URS-iframe")]')
await frame.locator('[name="email"]').fill('liufuhua007@163.com')
await frame.locator('[name="password"]').fill("Liuyihong1023@")
await frame.locator('#dologin').click()
# await page.pause()
await page.wait_for_timeout(15000)
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
[account1]
cookies = changeme
import configparser
import json
import pathlib
from time import sleep
import requests
from playwright.sync_api import sync_playwright
from conf import BASE_DIR, XHS_SERVER
config = configparser.RawConfigParser()
config.read('accounts.ini')
def sign_local(uri, data=None, a1="", web_session=""):
for _ in range(10):
try:
with sync_playwright() as playwright:
stealth_js_path = pathlib.Path(BASE_DIR / "utils/stealth.min.js")
chromium = playwright.chromium
# 如果一直失败可尝试设置成 False 让其打开浏览器,适当添加 sleep 可查看浏览器状态
browser = chromium.launch(headless=True)
browser_context = browser.new_context()
browser_context.add_init_script(path=stealth_js_path)
context_page = browser_context.new_page()
context_page.goto("https://www.xiaohongshu.com")
browser_context.add_cookies([
{'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}]
)
context_page.reload()
# 这个地方设置完浏览器 cookie 之后,如果这儿不 sleep 一下签名获取就失败了,如果经常失败请设置长一点试试
sleep(2)
encrypt_params = context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data])
return {
"x-s": encrypt_params["X-s"],
"x-t": str(encrypt_params["X-t"])
}
except Exception:
# 这儿有时会出现 window._webmsxyw is not a function 或未知跳转错误,因此加一个失败重试趴
pass
raise Exception("重试了这么多次还是无法签名成功,寄寄寄")
def sign(uri, data=None, a1="", web_session=""):
# 填写自己的 flask 签名服务端口地址
res = requests.post(f"{XHS_SERVER}/sign",
json={"uri": uri, "data": data, "a1": a1, "web_session": web_session})
signs = res.json()
return {
"x-s": signs["x-s"],
"x-t": signs["x-t"]
}
def beauty_print(data: dict):
print(json.dumps(data, ensure_ascii=False, indent=2))
import datetime
import json
import qrcode
from time import sleep
from xhs import XhsClient
from uploader.xhs_uploader.main import sign
# pip install qrcode
if __name__ == '__main__':
xhs_client = XhsClient(sign=sign, timeout=60)
print(datetime.datetime.now())
qr_res = xhs_client.get_qrcode()
qr_id = qr_res["qr_id"]
qr_code = qr_res["code"]
qr = qrcode.QRCode(version=1, error_correction=qrcode.ERROR_CORRECT_L,
box_size=50,
border=1)
qr.add_data(qr_res["url"])
qr.make()
qr.print_ascii()
while True:
check_qrcode = xhs_client.check_qrcode(qr_id, qr_code)
print(check_qrcode)
sleep(1)
if check_qrcode["code_status"] == 2:
print(json.dumps(check_qrcode["login_info"], indent=4))
print("当前 cookie:" + xhs_client.cookie)
break
print(json.dumps(xhs_client.get_self_info(), indent=4))
\ No newline at end of file
File mode changed
from pathlib import Path
from typing import List
from conf import BASE_DIR
SOCIAL_MEDIA_DOUYIN = "douyin"
SOCIAL_MEDIA_TENCENT = "tencent"
SOCIAL_MEDIA_TIKTOK = "tiktok"
SOCIAL_MEDIA_BILIBILI = "bilibili"
SOCIAL_MEDIA_KUAISHOU = "kuaishou"
SOCIAL_MEDIA_WANGYIHAO = 'wangyihao'
def get_supported_social_media() -> List[str]:
return [SOCIAL_MEDIA_DOUYIN, SOCIAL_MEDIA_TENCENT, SOCIAL_MEDIA_TIKTOK, SOCIAL_MEDIA_KUAISHOU,SOCIAL_MEDIA_WANGYIHAO]
def get_cli_action() -> List[str]:
return ["upload", "login", "watch"]
async def set_init_script(context):
stealth_js_path = Path(BASE_DIR / "utils/stealth.min.js")
await context.add_init_script(path=stealth_js_path)
return context
import enum
class TencentZoneTypes(enum.Enum):
LIFESTYLE = '生活'
CUTE_KIDS = '萌娃'
MUSIC = '音乐'
KNOWLEDGE = '知识'
EMOTION = '情感'
TRAVEL_SCENERY = '旅行风景'
FASHION = '时尚'
FOOD = '美食'
LIFE_HACKS = '生活技巧'
DANCE = '舞蹈'
MOVIES_TV_SHOWS = '影视综艺'
SPORTS = '运动'
FUNNY = '搞笑'
CELEBRITIES = '明星名人'
NEWS_INFO = '新闻资讯'
GAMING = '游戏'
AUTOMOTIVE = '车'
ANIME = '二次元'
TALENT = '才艺'
CUTE_PETS = '萌宠'
INDUSTRY_MACHINERY_CONSTRUCTION = '机械'
ANIMALS = '动物'
PARENTING = '育儿'
TECHNOLOGY = '科技'
class VideoZoneTypes(enum.Enum):
"""
所有分区枚举
- MAINPAGE: 主页
- ANIME: 番剧
- ANIME_SERIAL: 连载中番剧
- ANIME_FINISH: 已完结番剧
- ANIME_INFORMATION: 资讯
- ANIME_OFFICAL: 官方延伸
- MOVIE: 电影
- GUOCHUANG: 国创
- GUOCHUANG_CHINESE: 国产动画
- GUOCHUANG_ORIGINAL: 国产原创相关
- GUOCHUANG_PUPPETRY: 布袋戏
- GUOCHUANG_MOTIONCOMIC: 动态漫·广播剧
- GUOCHUANG_INFORMATION: 资讯
- TELEPLAY: 电视剧
- DOCUMENTARY: 纪录片
- DOUGA: 动画
- DOUGA_MAD: MAD·AMV
- DOUGA_MMD: MMD·3D
- DOUGA_VOICE: 短片·手书·配音
- DOUGA_GARAGE_KIT: 手办·模玩
- DOUGA_TOKUSATSU: 特摄
- DOUGA_ACGNTALKS: 动漫杂谈
- DOUGA_OTHER: 综合
- GAME: 游戏
- GAME_STAND_ALONE: 单机游戏
- GAME_ESPORTS: 电子竞技
- GAME_MOBILE: 手机游戏
- GAME_ONLINE: 网络游戏
- GAME_BOARD: 桌游棋牌
- GAME_GMV: GMV
- GAME_MUSIC: 音游
- GAME_MUGEN: Mugen
- KICHIKU: 鬼畜
- KICHIKU_GUIDE: 鬼畜调教
- KICHIKU_MAD: 音MAD
- KICHIKU_MANUAL_VOCALOID: 人力VOCALOID
- KICHIKU_THEATRE: 鬼畜剧场
- KICHIKU_COURSE: 教程演示
- MUSIC: 音乐
- MUSIC_ORIGINAL: 原创音乐
- MUSIC_COVER: 翻唱
- MUSIC_PERFORM: 演奏
- MUSIC_VOCALOID: VOCALOID·UTAU
- MUSIC_LIVE: 音乐现场
- MUSIC_MV: MV
- MUSIC_COMMENTARY: 乐评盘点
- MUSIC_TUTORIAL: 音乐教学
- MUSIC_OTHER: 音乐综合
- DANCE: 舞蹈
- DANCE_OTAKU: 宅舞
- DANCE_HIPHOP: 街舞
- DANCE_STAR: 明星舞蹈
- DANCE_CHINA: 中国舞
- DANCE_THREE_D: 舞蹈综合
- DANCE_DEMO: 舞蹈教程
- CINEPHILE: 影视
- CINEPHILE_CINECISM: 影视杂谈
- CINEPHILE_MONTAGE: 影视剪辑
- CINEPHILE_SHORTFILM: 小剧场
- CINEPHILE_TRAILER_INFO: 预告·资讯
- ENT: 娱乐
- ENT_VARIETY: 综艺
- ENT_TALKER: 娱乐杂谈
- ENT_FANS: 粉丝创作
- ENT_CELEBRITY: 明星综合
- KNOWLEDGE: 知识
- KNOWLEDGE_SCIENCE: 科学科普
- KNOWLEDGE_SOCIAL_SCIENCE: 社科·法律·心理
- KNOWLEDGE_HUMANITY_HISTORY: 人文历史
- KNOWLEDGE_BUSINESS: 财经商业
- KNOWLEDGE_CAMPUS: 校园学习
- KNOWLEDGE_CAREER: 职业职场
- KNOWLEDGE_DESIGN: 设计·创意
- KNOWLEDGE_SKILL: 野生技能协会
- TECH: 科技
- TECH_DIGITAL: 数码
- TECH_APPLICATION: 软件应用
- TECH_COMPUTER_TECH: 计算机技术
- TECH_INDUSTRY: 科工机械
- INFORMATION: 资讯
- INFORMATION_HOTSPOT: 热点
- INFORMATION_GLOBAL: 环球
- INFORMATION_SOCIAL: 社会
- INFORMATION_MULTIPLE: 综合
- FOOD: 美食
- FOOD_MAKE: 美食制作
- FOOD_DETECTIVE: 美食侦探
- FOOD_MEASUREMENT: 美食测评
- FOOD_RURAL: 田园美食
- FOOD_RECORD: 美食记录
- LIFE: 生活
- LIFE_FUNNY: 搞笑
- LIFE_TRAVEL: 出行
- LIFE_RURALLIFE: 三农
- LIFE_HOME: 家居房产
- LIFE_HANDMAKE: 手工
- LIFE_PAINTING: 绘画
- LIFE_DAILY: 日常
- CAR: 汽车
- CAR_RACING: 赛车
- CAR_MODIFIEDVEHICLE: 改装玩车
- CAR_NEWENERGYVEHICLE: 新能源车
- CAR_TOURINGCAR: 房车
- CAR_MOTORCYCLE: 摩托车
- CAR_STRATEGY: 购车攻略
- CAR_LIFE: 汽车生活
- FASHION: 时尚
- FASHION_MAKEUP: 美妆护肤
- FASHION_COS: 仿妆cos
- FASHION_CLOTHING: 穿搭
- FASHION_TREND: 时尚潮流
- SPORTS: 运动
- SPORTS_BASKETBALL: 篮球
- SPORTS_FOOTBALL: 足球
- SPORTS_AEROBICS: 健身
- SPORTS_ATHLETIC: 竞技体育
- SPORTS_CULTURE: 运动文化
- SPORTS_COMPREHENSIVE: 运动综合
- ANIMAL: 动物圈
- ANIMAL_CAT: 喵星人
- ANIMAL_DOG: 汪星人
- ANIMAL_PANDA: 大熊猫
- ANIMAL_WILD_ANIMAL: 野生动物
- ANIMAL_REPTILES: 爬宠
- ANIMAL_COMPOSITE: 动物综合
- VLOG: VLOG
"""
MAINPAGE = 0
ANIME = 13
ANIME_SERIAL = 33
ANIME_FINISH = 32
ANIME_INFORMATION = 51
ANIME_OFFICAL = 152
MOVIE = 23
GUOCHUANG = 167
GUOCHUANG_CHINESE = 153
GUOCHUANG_ORIGINAL = 168
GUOCHUANG_PUPPETRY = 169
GUOCHUANG_MOTIONCOMIC = 195
GUOCHUANG_INFORMATION = 170
TELEPLAY = 11
DOCUMENTARY = 177
DOUGA = 1
DOUGA_MAD = 24
DOUGA_MMD = 25
DOUGA_VOICE = 47
DOUGA_GARAGE_KIT = 210
DOUGA_TOKUSATSU = 86
DOUGA_ACGNTALKS = 253
DOUGA_OTHER = 27
GAME = 4
GAME_STAND_ALONE = 17
GAME_ESPORTS = 171
GAME_MOBILE = 172
GAME_ONLINE = 65
GAME_BOARD = 173
GAME_GMV = 121
GAME_MUSIC = 136
GAME_MUGEN = 19
KICHIKU = 119
KICHIKU_GUIDE = 22
KICHIKU_MAD = 26
KICHIKU_MANUAL_VOCALOID = 126
KICHIKU_THEATRE = 216
KICHIKU_COURSE = 127
MUSIC = 3
MUSIC_ORIGINAL = 28
MUSIC_COVER = 31
MUSIC_PERFORM = 59
MUSIC_VOCALOID = 30
MUSIC_LIVE = 29
MUSIC_MV = 193
MUSIC_COMMENTARY = 243
MUSIC_TUTORIAL = 244
MUSIC_OTHER = 130
DANCE = 129
DANCE_OTAKU = 20
DANCE_HIPHOP = 198
DANCE_STAR = 199
DANCE_CHINA = 200
DANCE_THREE_D = 154
DANCE_DEMO = 156
CINEPHILE = 181
CINEPHILE_CINECISM = 182
CINEPHILE_MONTAGE = 183
CINEPHILE_SHORTFILM = 85
CINEPHILE_TRAILER_INFO = 184
ENT = 5
ENT_VARIETY = 71
ENT_TALKER = 241
ENT_FANS = 242
ENT_CELEBRITY = 137
KNOWLEDGE = 36
KNOWLEDGE_SCIENCE = 201
KNOWLEDGE_SOCIAL_SCIENCE = 124
KNOWLEDGE_HUMANITY_HISTORY = 228
KNOWLEDGE_BUSINESS = 207
KNOWLEDGE_CAMPUS = 208
KNOWLEDGE_CAREER = 209
KNOWLEDGE_DESIGN = 229
KNOWLEDGE_SKILL = 122
TECH = 188
TECH_DIGITAL = 95
TECH_APPLICATION = 230
TECH_COMPUTER_TECH = 231
TECH_INDUSTRY = 232
INFORMATION = 202
INFORMATION_HOTSPOT = 203
INFORMATION_GLOBAL = 204
INFORMATION_SOCIAL = 205
INFORMATION_MULTIPLE = 206
FOOD = 211
FOOD_MAKE = 76
FOOD_DETECTIVE = 212
FOOD_MEASUREMENT = 213
FOOD_RURAL = 214
FOOD_RECORD = 215
LIFE = 160
LIFE_FUNNY = 138
LIFE_TRAVEL = 250
LIFE_RURALLIFE = 251
LIFE_HOME = 239
LIFE_HANDMAKE = 161
LIFE_PAINTING = 162
LIFE_DAILY = 21
CAR = 223
CAR_RACING = 245
CAR_MODIFIEDVEHICLE = 246
CAR_NEWENERGYVEHICLE = 247
CAR_TOURINGCAR = 248
CAR_MOTORCYCLE = 240
CAR_STRATEGY = 227
CAR_LIFE = 176
FASHION = 155
FASHION_MAKEUP = 157
FASHION_COS = 252
FASHION_CLOTHING = 158
FASHION_TREND = 159
SPORTS = 234
SPORTS_BASKETBALL = 235
SPORTS_FOOTBALL = 249
SPORTS_AEROBICS = 164
SPORTS_ATHLETIC = 236
SPORTS_CULTURE = 237
SPORTS_COMPREHENSIVE = 238
ANIMAL = 217
ANIMAL_CAT = 218
ANIMAL_DOG = 219
ANIMAL_PANDA = 220
ANIMAL_WILD_ANIMAL = 221
ANIMAL_REPTILES = 222
ANIMAL_COMPOSITE = 75
VLOG = 19
from datetime import timedelta
from datetime import datetime
from pathlib import Path
from conf import BASE_DIR
def get_absolute_path(relative_path: str, base_dir: str = None) -> str:
# Convert the relative path to an absolute path
absolute_path = Path(BASE_DIR) / base_dir / relative_path
return str(absolute_path)
def get_title_and_hashtags(filename):
"""
获取视频标题和 hashtag
Args:
filename: 视频文件名
Returns:
视频标题和 hashtag 列表
"""
# 获取视频标题和 hashtag txt 文件名
txt_filename = filename.replace(".mp4", ".txt")
# 读取 txt 文件
with open(txt_filename, "r", encoding="utf-8") as f:
content = f.read()
# 获取标题和 hashtag
splite_str = content.strip().split("\n")
title = splite_str[0]
hashtags = splite_str[1].replace("#", "").split(" ")
return title, hashtags
def generate_schedule_time_next_day(total_videos, videos_per_day, daily_times=None, timestamps=False, start_days=0):
"""
Generate a schedule for video uploads, starting from the next day.
Args:
- total_videos: Total number of videos to be uploaded.
- videos_per_day: Number of videos to be uploaded each day.
- daily_times: Optional list of specific times of the day to publish the videos.
- timestamps: Boolean to decide whether to return timestamps or datetime objects.
- start_days: Start from after start_days.
Returns:
- A list of scheduling times for the videos, either as timestamps or datetime objects.
"""
if videos_per_day <= 0:
raise ValueError("videos_per_day should be a positive integer")
if daily_times is None:
# Default times to publish videos if not provided
daily_times = [6, 11, 14, 16, 22]
if videos_per_day > len(daily_times):
raise ValueError("videos_per_day should not exceed the length of daily_times")
# Generate timestamps
schedule = []
current_time = datetime.now()
for video in range(total_videos):
day = video // videos_per_day + start_days + 1 # +1 to start from the next day
daily_video_index = video % videos_per_day
# Calculate the time for the current video
hour = daily_times[daily_video_index]
time_offset = timedelta(days=day, hours=hour - current_time.hour, minutes=-current_time.minute,
seconds=-current_time.second, microseconds=-current_time.microsecond)
timestamp = current_time + time_offset
schedule.append(timestamp)
if timestamps:
schedule = [int(time.timestamp()) for time in schedule]
return schedule
from pathlib import Path
from sys import stdout
from loguru import logger
from conf import BASE_DIR
def log_formatter(record: dict) -> str:
"""
Formatter for log records.
:param dict record: Log object containing log metadata & message.
:returns: str
"""
colors = {
"TRACE": "#cfe2f3",
"INFO": "#9cbfdd",
"DEBUG": "#8598ea",
"WARNING": "#dcad5a",
"SUCCESS": "#3dd08d",
"ERROR": "#ae2c2c"
}
color = colors.get(record["level"].name, "#b3cfe7")
return f"<fg #70acde>{{time:YYYY-MM-DD HH:mm:ss}}</fg #70acde> | <fg {color}>{{level}}</fg {color}>: <light-white>{{message}}</light-white>\n"
def create_logger(log_name: str, file_path: str):
"""
Create custom logger for different business modules.
:param str log_name: name of log
:param str file_path: Optional path to log file
:returns: Configured logger
"""
def filter_record(record):
return record["extra"].get("business_name") == log_name
Path(BASE_DIR / file_path).parent.mkdir(exist_ok=True)
logger.add(Path(BASE_DIR / file_path), filter=filter_record, level="INFO", rotation="10 MB", retention="10 days", backtrace=True, diagnose=True)
return logger.bind(business_name=log_name)
# Remove all existing handlers
logger.remove()
# Add a standard console handler
logger.add(stdout, colorize=True, format=log_formatter)
douyin_logger = create_logger('douyin', 'logs/douyin.log')
tencent_logger = create_logger('tencent', 'logs/tencent.log')
xhs_logger = create_logger('xhs', 'logs/xhs.log')
tiktok_logger = create_logger('tiktok', 'logs/tiktok.log')
bilibili_logger = create_logger('bilibili', 'logs/bilibili.log')
kuaishou_logger = create_logger('kuaishou', 'logs/kuaishou.log')
This diff could not be displayed because it is too large.
No preview for this file type
这位勇敢的男子为了心爱之人每天坚守 🥺❤️‍🩹
#坚持不懈 #爱情执着 #奋斗使者 #短视频
\ No newline at end of file