main.py
6.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -*- coding: utf-8 -*-
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
import json
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import sohu_logger
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
await set_init_script(context)
# context = await browser.new_context()
page = await context.new_page()
# saved_cookies = await read_cookies_from_file(account_file)
# await context.add_cookies(saved_cookies)
try:
await page.goto("https://tv.sohu.com/", timeout=5000)
except:
pass
try:
await page.wait_for_selector("div.user-ico", timeout=5000) # 等待5秒
sohu_logger.success("[+] cookie 有效")
return True
except:
sohu_logger.info("[+] 等待5秒 cookie 失效")
return False
async def sohu_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "sohu_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
sohu_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
await get_sohu_cookie(account_file)
return True
async def get_sohu_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': True, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
try:
await page.goto("https://tv.sohu.com/", timeout=5000)
except:
pass
await asyncio.sleep(1)
await page.wait_for_selector("div.user-ico", timeout=50000)
sohu_logger.info("搜狐视频登录成功")
await asyncio.sleep(3)
# await page.pause()
# 点击调试器的继续,保存cookie
# cookies = await context.cookies()
# await write_cookies_to_file(cookies, account_file)
await context.storage_state(path=account_file)
class SOHUVideo(object):
def __init__(self, title, file_path, tags, publish_date: datetime, account_file):
self.title = title # 视频标题
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.date_format = '%Y-%m-%d %H:%M'
self.local_executable_path = LOCAL_CHROME_PATH
async def handle_upload_error(self, page):
pass
# sohu_logger.error("视频出错了,重新上传中")
# await page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
# 使用 Chromium 浏览器启动一个浏览器实例
if self.local_executable_path:
browser = await playwright.chromium.launch(
headless=False,
executable_path=self.local_executable_path,
)
else:
browser = await playwright.chromium.launch(
headless=True
) # 创建一个浏览器上下文,使用指定的 cookie 文件
context = await browser.new_context(storage_state=self.account_file)
page = await context.new_page()
# saved_cookies = await read_cookies_from_file(self.account_file)
# await context.add_cookies(saved_cookies)
# context = await set_init_script(context)
# context.on("close", lambda: )
# 创建一个新的页面
# 访问指定的 URL
try:
await page.goto("https://tv.sohu.com/s/center/index.html#/", timeout=5000)
except:
pass
sohu_logger.info('正在上传-------{}.mp4'.format(self.title))
# 等待页面跳转到指定的 URL,没进入,则自动等待到超时
# 点击 "上传视频" 按钮
# await page.locator("dev.release-top").set_input_files(
# self.file_path)
await page.locator("input[type='file']").set_input_files(
self.file_path)
upload_button = page.locator('div.form-button span:has-text("发布")')
await upload_button.wait_for(state='visible') # 确保按钮可见
await asyncio.sleep(1)
sohu_logger.info("正在填充标题...")
await page.locator("input.input-topic").nth(0).fill(self.title[:99]) # 搜狐最多100
await asyncio.sleep(1)
sohu_logger.info("正在填充话题...")
inputTag = page.locator("input.input-topic").nth(1)
await inputTag.click()
for index, tag in enumerate(self.tags[:2], start=1):
await page.keyboard.type(f"#{tag}")
await page.keyboard.press("Enter")
sohu_logger.info("已写完tag")
await asyncio.sleep(1)
sohu_logger.info(self.publish_date)
# 定时任务 搜狐没有定时发布
# if self.publish_date != 0:
# await self.set_schedule_time(page, self.publish_date)
# upload_button = page.locator('button div span:has-text("定时发布")')
sohu_logger.info("提交")
await upload_button.click()
await page.wait_for_selector('h3:has-text("视频管理")')
await context.storage_state(path=self.account_file)
sohu_logger.info('cookie更新完毕!')
await asyncio.sleep(4) # 这里延迟是为了方便眼睛直观的观看
# 关闭浏览器上下文和浏览器实例
await context.close()
await browser.close()
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)