main.py
11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# -*- coding: utf-8 -*-
from datetime import datetime
from playwright.async_api import Playwright, async_playwright, Page
import os
import asyncio
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.log import douyin_logger
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://creator.douyin.com/creator-micro/content/upload")
try:
await page.wait_for_url("https://creator.douyin.com/creator-micro/content/upload", timeout=5000)
except:
print("[+] 等待5秒 cookie 失效")
await context.close()
await browser.close()
return False
# 2024.06.17 抖音创作者中心改版
if await page.get_by_text('手机号登录').count():
print("[+] 等待5秒 cookie 失效")
return False
else:
print("[+] cookie 有效")
return True
async def douyin_setup(account_file, handle=False):
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
# Todo alert message
return False
douyin_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
await douyin_cookie_gen(account_file)
return True
async def douyin_cookie_gen(account_file):
async with async_playwright() as playwright:
options = {
'headless': False
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://creator.douyin.com/")
await page.pause()
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
class DouYinVideo(object):
def __init__(self, title, file_path, tags, publish_date: datetime, account_file, thumbnail_path=None):
self.title = title # 视频标题
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.date_format = '%Y年%m月%d日 %H:%M'
self.local_executable_path = LOCAL_CHROME_PATH
self.thumbnail_path = thumbnail_path
async def set_schedule_time_douyin(self, page, publish_date):
# 选择包含特定文本内容的 label 元素
label_element = page.locator("[class^='radio']:has-text('定时发布')")
# 在选中的 label 元素下点击 checkbox
await label_element.click()
await asyncio.sleep(1)
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
await asyncio.sleep(1)
await page.locator('.semi-input[placeholder="日期和时间"]').click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date_hour))
await page.keyboard.press("Enter")
await asyncio.sleep(1)
async def handle_upload_error(self, page):
douyin_logger.info('视频出错了,重新上传中')
await page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
# 使用 Chromium 浏览器启动一个浏览器实例
if self.local_executable_path:
browser = await playwright.chromium.launch(headless=False, executable_path=self.local_executable_path)
else:
browser = await playwright.chromium.launch(headless=False)
# 创建一个浏览器上下文,使用指定的 cookie 文件
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://creator.douyin.com/creator-micro/content/upload")
douyin_logger.info(f'[+]正在上传-------{self.title}.mp4')
# 等待页面跳转到指定的 URL,没进入,则自动等待到超时
douyin_logger.info(f'[-] 正在打开主页...')
await page.wait_for_url("https://creator.douyin.com/creator-micro/content/upload")
# 点击 "上传视频" 按钮
await page.locator("div[class^='container'] input").set_input_files(self.file_path)
# 等待页面跳转到指定的 URL
while True:
# 判断是是否进入视频发布页面,没进入,则自动等待到超时
try:
await page.wait_for_url(
"https://creator.douyin.com/creator-micro/content/publish?enter_from=publish_page")
break
except:
douyin_logger.info(f' [-] 正在等待进入视频发布页面...')
await asyncio.sleep(0.1)
# 填充标题和话题
# 检查是否存在包含输入框的元素
# 这里为了避免页面变化,故使用相对位置定位:作品标题父级右侧第一个元素的input子元素
await asyncio.sleep(1)
douyin_logger.info(f' [-] 正在填充标题和话题...')
title_container = page.get_by_text('作品标题').locator("..").locator("xpath=following-sibling::div[1]").locator("input")
if await title_container.count():
await title_container.fill(self.title[:30])
else:
titlecontainer = page.locator(".notranslate")
await titlecontainer.click()
await page.keyboard.press("Backspace")
await page.keyboard.press("Control+KeyA")
await page.keyboard.press("Delete")
await page.keyboard.type(self.title)
await page.keyboard.press("Enter")
css_selector = ".zone-container"
for index, tag in enumerate(self.tags, start=1):
await page.type(css_selector, "#" + tag)
await page.press(css_selector, "Space")
douyin_logger.info(f'总共添加{len(self.tags)}个话题')
while True:
# 判断重新上传按钮是否存在,如果不存在,代表视频正在上传,则等待
try:
# 新版:定位重新上传
number = await page.locator('[class^="long-card"] div:has-text("重新上传")').count()
if number > 0:
douyin_logger.success(" [-]视频上传完毕")
break
else:
douyin_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
if await page.locator('div.progress-div > div:has-text("上传失败")').count():
douyin_logger.error(" [-] 发现上传出错了... 准备重试")
await self.handle_upload_error(page)
except:
douyin_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
#上传视频封面
await self.set_thumbnail(page, self.thumbnail_path)
# 更换可见元素
await self.set_location(page, "杭州市")
# 頭條/西瓜
third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch'
# 定位是否有第三方平台
if await page.locator(third_part_element).count():
# 检测是否是已选中状态
if 'semi-switch-checked' not in await page.eval_on_selector(third_part_element, 'div => div.className'):
await page.locator(third_part_element).locator('input.semi-switch-native-control').click()
if self.publish_date != 0:
await self.set_schedule_time_douyin(page, self.publish_date)
# 判断视频是否发布成功
while True:
# 判断视频是否发布成功
try:
publish_button = page.get_by_role('button', name="发布", exact=True)
if await publish_button.count():
await publish_button.click()
await page.wait_for_url("https://creator.douyin.com/creator-micro/content/manage**",
timeout=3000) # 如果自动跳转到作品页面,则代表发布成功
douyin_logger.success(" [-]视频发布成功")
break
except:
douyin_logger.info(" [-] 视频正在发布中...")
await page.screenshot(full_page=True)
await asyncio.sleep(0.5)
await context.storage_state(path=self.account_file) # 保存cookie
douyin_logger.success(' [-]cookie更新完毕!')
await asyncio.sleep(2) # 这里延迟是为了方便眼睛直观的观看
# 关闭浏览器上下文和浏览器实例
await context.close()
await browser.close()
async def set_thumbnail(self, page: Page, thumbnail_path: str):
if thumbnail_path:
await page.click('text="选择封面"')
await page.wait_for_selector("div.semi-modal-content:visible")
await page.click('text="设置竖封面"')
await page.wait_for_timeout(2000) # 等待2秒
# 定位到上传区域并点击
await page.locator("div[class^='semi-upload upload'] >> input.semi-upload-hidden-input").set_input_files(thumbnail_path)
await page.wait_for_timeout(2000) # 等待2秒
await page.locator("div[class^='extractFooter'] button:visible:has-text('完成')").click()
# finish_confirm_element = page.locator("div[class^='confirmBtn'] >> div:has-text('完成')")
# if await finish_confirm_element.count():
# await finish_confirm_element.click()
# await page.locator("div[class^='footer'] button:has-text('完成')").click()
async def set_location(self, page: Page, location: str = "杭州市"):
# todo supoort location later
# await page.get_by_text('添加标签').locator("..").locator("..").locator("xpath=following-sibling::div").locator(
# "div.semi-select-single").nth(0).click()
await page.locator('div.semi-select span:has-text("输入地理位置")').click()
await page.keyboard.press("Backspace")
await page.wait_for_timeout(2000)
await page.keyboard.type(location)
await page.wait_for_selector('div[role="listbox"] [role="option"]', timeout=5000)
await page.locator('div[role="listbox"] [role="option"]').first.click()
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)