main.py
10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# -*- coding: utf-8 -*-
import re
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
from uploader.tk_uploader.tk_config import Tk_Locator
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import tiktok_logger
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.firefox.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://www.tiktok.com/tiktokstudio/upload?lang=en")
await page.wait_for_load_state('networkidle')
try:
# 选择所有的 select 元素
select_elements = await page.query_selector_all('select')
for element in select_elements:
class_name = await element.get_attribute('class')
# 使用正则表达式匹配特定模式的 class 名称
if re.match(r'tiktok-.*-SelectFormContainer.*', class_name):
tiktok_logger.error("[+] cookie expired")
return False
tiktok_logger.success("[+] cookie valid")
return True
except:
tiktok_logger.success("[+] cookie valid")
return True
async def tiktok_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "tk_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
tiktok_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
await get_tiktok_cookie(account_file)
return True
async def get_tiktok_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB',
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.firefox.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://www.tiktok.com/login?lang=en")
await page.pause()
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
class TiktokVideo(object):
def __init__(self, title, file_path, tags, publish_date, account_file):
self.title = title
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.locator_base = None
async def set_schedule_time(self, page, publish_date):
schedule_input_element = self.locator_base.get_by_label('Schedule')
await schedule_input_element.wait_for(state='visible') # 确保按钮可见
await schedule_input_element.click()
scheduled_picker = self.locator_base.locator('div.scheduled-picker')
await scheduled_picker.locator('div.TUXInputBox').nth(1).click()
calendar_month = await self.locator_base.locator('div.calendar-wrapper span.month-title').inner_text()
n_calendar_month = datetime.strptime(calendar_month, '%B').month
schedule_month = publish_date.month
if n_calendar_month != schedule_month:
if n_calendar_month < schedule_month:
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(-1)
else:
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(0)
await arrow.click()
# day set
valid_days_locator = self.locator_base.locator(
'div.calendar-wrapper span.day.valid')
valid_days = await valid_days_locator.count()
for i in range(valid_days):
day_element = valid_days_locator.nth(i)
text = await day_element.inner_text()
if text.strip() == str(publish_date.day):
await day_element.click()
break
# time set
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
hour_str = publish_date.strftime("%H")
correct_minute = int(publish_date.minute / 5)
minute_str = f"{correct_minute:02d}"
hour_selector = f"span.tiktok-timepicker-left:has-text('{hour_str}')"
minute_selector = f"span.tiktok-timepicker-right:has-text('{minute_str}')"
# pick hour first
await self.locator_base.locator(hour_selector).click()
# click time button again
# 等待某个特定的元素出现或状态变化,表明UI已更新
await page.wait_for_timeout(1000) # 等待500毫秒
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
# pick minutes after
await self.locator_base.locator(minute_selector).click()
# click title to remove the focus.
await self.locator_base.locator("h1:has-text('Upload video')").click()
async def handle_upload_error(self, page):
tiktok_logger.info("video upload error retrying.")
select_file_button = self.locator_base.locator('button[aria-label="Select file"]')
async with page.expect_file_chooser() as fc_info:
await select_file_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
browser = await playwright.firefox.launch(headless=False)
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
page = await context.new_page()
await page.goto("https://www.tiktok.com/creator-center/upload")
tiktok_logger.info(f'[+]Uploading-------{self.title}.mp4')
await page.wait_for_url("https://www.tiktok.com/tiktokstudio/upload", timeout=10000)
try:
await page.wait_for_selector('iframe[data-tt="Upload_index_iframe"], div.upload-container', timeout=10000)
tiktok_logger.info("Either iframe or div appeared.")
except Exception as e:
tiktok_logger.error("Neither iframe nor div appeared within the timeout.")
await self.choose_base_locator(page)
upload_button = self.locator_base.locator(
'button:has-text("Select video"):visible')
await upload_button.wait_for(state='visible') # 确保按钮可见
async with page.expect_file_chooser() as fc_info:
await upload_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
await self.add_title_tags(page)
# detact upload status
await self.detect_upload_status(page)
if self.publish_date != 0:
await self.set_schedule_time(page, self.publish_date)
await self.click_publish(page)
await context.storage_state(path=f"{self.account_file}") # save cookie
tiktok_logger.info(' [-] update cookie!')
await asyncio.sleep(2) # close delay for look the video status
# close all
await context.close()
await browser.close()
async def add_title_tags(self, page):
editor_locator = self.locator_base.locator('div.public-DraftEditor-content')
await editor_locator.click()
await page.keyboard.press("End")
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await page.keyboard.press("End")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.insert_text(self.title)
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.press("End")
await page.keyboard.press("Enter")
# tag part
for index, tag in enumerate(self.tags, start=1):
tiktok_logger.info("Setting the %s tag" % index)
await page.keyboard.press("End")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.insert_text("#" + tag + " ")
await page.keyboard.press("Space")
await page.wait_for_timeout(1000) # 等待1秒
await page.keyboard.press("Backspace")
await page.keyboard.press("End")
async def click_publish(self, page):
success_flag_div = '#\\:r9\\:'
while True:
try:
publish_button = self.locator_base.locator('div.btn-post')
if await publish_button.count():
await publish_button.click()
await self.locator_base.locator(success_flag_div).wait_for(state="visible", timeout=3000)
tiktok_logger.success(" [-] video published success")
break
except Exception as e:
if await self.locator_base.locator(success_flag_div).count():
tiktok_logger.success(" [-]video published success")
break
else:
tiktok_logger.exception(f" [-] Exception: {e}")
tiktok_logger.info(" [-] video publishing")
await page.screenshot(full_page=True)
await asyncio.sleep(0.5)
async def detect_upload_status(self, page):
while True:
try:
if await self.locator_base.locator('div.btn-post > button').get_attribute("disabled") is None:
tiktok_logger.info(" [-]video uploaded.")
break
else:
tiktok_logger.info(" [-] video uploading...")
await asyncio.sleep(2)
if await self.locator_base.locator('button[aria-label="Select file"]').count():
tiktok_logger.info(" [-] found some error while uploading now retry...")
await self.handle_upload_error(page)
except:
tiktok_logger.info(" [-] video uploading...")
await asyncio.sleep(2)
async def choose_base_locator(self, page):
# await page.wait_for_selector('div.upload-container')
if await page.locator('iframe[data-tt="Upload_index_iframe"]').count():
self.locator_base = self.locator_base
else:
self.locator_base = page.locator(Tk_Locator.default)
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)