liufuhua007

增加头条,增加登录滑块第一版

......@@ -12,7 +12,41 @@ xj-marketing This project aims to automate the posting of videos to various soci
- [x] 小红书
- [x] 快手
- [ ] 百家号
- [ ] 网易号
- [ ] 哔哩哔哩
- [ ] 腾讯视频
- [ ] 西瓜视频
- [ ] 搜狐视频
- [ ] 爱奇艺
- [ ] 皮皮虾
- [ ] 多多视频
- [ ] 美拍
- [ ] 腾讯微视
- [ ] AcFun
- [ ] 秒拍
- [ ] 小红书商家
- [ ] 头条号
- [ ] 百家号
- [ ] 搜狐号
- [x] 网易号
- [ ] 企鹅号
- [ ] 知乎
- [ ] 一点号
- [ ] 大鱼号
- [ ] 微信公众号
- [ ] 新浪微博
- [ ] 快传号
- [ ] 雪球号
- [ ] 豆瓣
- [ ] 简书号
- [ ] 东方号
- [ ] 大风号
- [ ] 蜂网
- [ ] 美柚
- [ ] WIFI万能钥匙
- [ ] 易车号
- [ ] 车家号
- [ ] CSDN
- [ ] 微博头条
- 部分国外社交媒体:
- [x] tiktok
......

11.3 KB

......@@ -10,8 +10,9 @@ from uploader.ks_uploader.main import ks_setup, KSVideo
from uploader.tencent_uploader.main import weixin_setup, TencentVideo
from uploader.tk_uploader.main_chrome import tiktok_setup, TiktokVideo
from uploader.wyh_uploader.main import wyh_setup
from uploader.toutiao_uploader.main import toutiao_setup
from utils.base_social_media import get_supported_social_media, get_cli_action, SOCIAL_MEDIA_DOUYIN, \
SOCIAL_MEDIA_TENCENT, SOCIAL_MEDIA_TIKTOK, SOCIAL_MEDIA_KUAISHOU, SOCIAL_MEDIA_WANGYIHAO
SOCIAL_MEDIA_TENCENT, SOCIAL_MEDIA_TIKTOK, SOCIAL_MEDIA_KUAISHOU, SOCIAL_MEDIA_WANGYIHAO, SOCIAL_MEDIA_TOUTIAO
from utils.constant import TencentZoneTypes
from utils.files_times import get_title_and_hashtags
......@@ -71,6 +72,8 @@ async def main():
await ks_setup(str(account_file), handle=True)
elif args.platform == SOCIAL_MEDIA_WANGYIHAO:
await wyh_setup(str(account_file), handle=True)
elif args.platform == SOCIAL_MEDIA_TOUTIAO:
await toutiao_setup(str(account_file), handle=True)
elif args.action == 'upload':
title, tags = get_title_and_hashtags(args.video_file)
......
......@@ -6,4 +6,7 @@ cf_clearance
biliup
xhs
qrcode
loguru
\ No newline at end of file
loguru
opencv-python
numpy
pandas
\ No newline at end of file
......
import os
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import kuaishou_logger
from playwright.async_api import Playwright, async_playwright
async def toutiao_setup(account_file, handle=False):
print(account_file)
account_file = get_absolute_path(account_file, "wyh_uploader")
if not os.path.exists(account_file):
if not handle:
return False
kuaishou_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
await get_wyh_cookie(account_file)
else:
await open_wyh_main_page(account_file)
return True
async def open_wyh_main_page(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context(storage_state=account_file) # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://mp.toutiao.com")
await page.pause()
browser.close()
async def get_wyh_cookie(account_file):
print("get_wyh_cookie")
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://mp.toutiao.com")
# 手动授权登录
# await page.pause()
# 点击调试器的继续,保存cookie
# await context.storage_state(path=account_file)
# 自动登陆
await page.wait_for_timeout(5000)
await page.get_by_text('密码登录').click()
await page.wait_for_timeout(5000)
await page.get_by_placeholder("手机号/邮箱").fill('18610534668')
print("输入账号成功")
await page.wait_for_timeout(1000)
await page.get_by_placeholder('密码').fill("Liuyihong1023@")
await page.wait_for_timeout(1000)
await page.locator("//*[@class='web-login-confirm-info__checkbox']").click()
await page.wait_for_timeout(1000)
await page.locator("//*[@class='web-login-button']").click()
await page.wait_for_timeout(10000)
# 点击调试器的继续,保存cookie
await context.storage_state(path=account_file)
import random
import cv2
import numpy as np
import pandas as pd
import math
import os
# x方向一阶导中值
def get_dx_median(dx, x, y, w, h):
return np.median(dx[y:(y + h), x])
# 预处理
def pre_process(img_path):
img = cv2.imread(img_path, 1)
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # 转成灰度图像
_, binary = cv2.threshold(img_gray, 127, 255, cv2.THRESH_BINARY) # 将灰度图像转成二值图像
contours, hierarchy = cv2.findContours(binary, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE) # 查找轮廓
rect_area = []
rect_arc_length = []
cnt_infos = {}
for i, cnt in enumerate(contours):
if cv2.contourArea(cnt) < 5000 or cv2.contourArea(cnt) > 25000:
continue
x, y, w, h = cv2.boundingRect(cnt)
cnt_infos[i] = {'rect_area': w * h, # 矩形面积
'rect_arclength': 2 * (w + h), # 矩形周长
'cnt_area': cv2.contourArea(cnt), # 轮廓面积
'cnt_arclength': cv2.arcLength(cnt, True), # 轮廓周长
'cnt': cnt, # 轮廓
'w': w,
'h': h,
'x': x,
'y': y,
'mean': np.mean(np.min(img[y:(y + h), x:(x + w)], axis=2)), # 矩形内像素平均
}
rect_area.append(w * h)
rect_arc_length.append(2 * (w + h))
dx = cv2.Sobel(img, -1, 1, 0, ksize=5)
return img, dx, cnt_infos
def qq_mark_pos(img_path):
if not os.path.exists(img_path):
print("文件不存在")
return 0
img, dx, cnt_infos = pre_process(img_path)
h, w = img.shape[:2]
df = pd.DataFrame(cnt_infos).T
df.head()
df['dx_mean'] = df.apply(lambda x: get_dx_median(dx, x['x'], x['y'], x['w'], x['h']), axis=1)
df['rect_ratio'] = df.apply(lambda v: v['rect_arclength'] / 4 / math.sqrt(v['rect_area'] + 1), axis=1)
df['area_ratio'] = df.apply(lambda v: v['rect_area'] / v['cnt_area'], axis=1)
df['score'] = df.apply(lambda x: abs(x['rect_ratio'] - 1), axis=1)
result = df.query('x>0').query('area_ratio<2').query('rect_area>5000').query('rect_area<20000').sort_values(
['mean', 'score', 'dx_mean']).head(2)
if len(result):
x_left = result.x.values[0]
# cv2.line(img, (x_left, 0), (x_left, h), color=(255, 0, 255))
# plt.imshow(img)
# plt.show()
return result
def get_track_list(distance):
"""
模拟轨迹 假装是人在操作
"""
# 初速度
v = 0
# 单位时间为0.2s来统计轨迹,轨迹即0.2内的位移
t = 0.2
# 位移/轨迹列表,列表内的一个元素代表0.2s的位移
tracks = []
# 当前的位移
current = 0
# 到达mid值开始减速
mid = distance * 7 / 8
distance += 10 # 先滑过一点,最后再反着滑动回来
# a = random.randint(1,3)
while current < distance:
if current < mid:
# 加速度越小,单位时间的位移越小,模拟的轨迹就越多越详细
a = random.randint(2, 4) # 加速运动
else:
a = -random.randint(3, 5) # 减速运动
# 初速度
v0 = v
# 0.2秒时间内的位移
s = v0 * t + 0.5 * a * (t ** 2)
# 当前的位置
current += s
# 添加到轨迹列表
tracks.append(round(s))
# 速度已经达到v,该速度作为下次的初速度
v = v0 + a * t
# 反着滑动到大概准确位置
for i in range(4):
tracks.append(-random.randint(2, 3))
for i in range(4):
tracks.append(-random.randint(1, 3))
return tracks
# if __name__ == "__main__":
# img_path = "bg.png"
# if not os.path.exists(img_path):
# print("文件不存在")
# res = qq_mark_pos(img_path)
# x = res.x.values[0]
# x_r = 344 * x / 699
# track = get_track_list(res.x.values[0])
# print(f'x:{x},xr:{x_r}, track:{track}')
import io
import time
from playwright.sync_api import sync_playwright, Route
from CaptchaCv2 import (get_track_list, qq_mark_pos)
distance = 0
is_reflashed_img = False
img = "bg.png"
retryTimes = 10
def handle_captcha(route: Route) -> None:
print("handle captcha begin")
response = route.fetch()
if response.status == 200:
print("handle captcha response is 200")
buffer = response.body()
# 下载指定规则url的验证码图片
if "index=1" in response.url:
is_reflashed_img = True
with open(img, "wb") as f:
f.write(buffer)
route.continue_()
def dragbox_location():
for i in range(5):
dragbox_bounding = page.frame_locator("#tcaptcha_iframe").locator(
"#tcaptcha_drag_thumb").bounding_box()
if dragbox_bounding is not None and dragbox_bounding["x"] > 20:
return dragbox_bounding
return None
def drag_to_breach(move_distance):
print('开始拖动滑块..')
drag_box = dragbox_location()
if drag_box is None:
print('未获取到滑块位置,识别失败')
return False
page.mouse.move(drag_box["x"] + drag_box["width"] / 2,
drag_box["y"] + drag_box["height"] / 2)
page.mouse.down()
location_x = drag_box["x"]
for i in move_distance:
location_x += i
page.mouse.move(location_x, drag_box["y"])
page.mouse.up()
if page.get_by_text("后重试") is not None or page.get_by_text("请控制拼图对齐缺口") is not None:
print("识别成功")
return True
print('识别失败')
return False
def calc_distance():
for i in range(retryTimes):
print(f"识别验证码距离中,当前等待轮数{i + 1}/{retryTimes}")
try:
res = qq_mark_pos(img)
distance = res.x.values[0]
if distance > 0:
print(f"获取到缺口距离:{distance}")
return distance
except Exception as e:
print(f"识别错误, 异常:{e}")
with sync_playwright() as p:
# browser = p.chromium.launch(channel="msedge",proxy={"server": "http://{}".format(proxy)})
browser = p.chromium.launch(headless=False)
iphone_12 = p.devices["iPhone 12"]
context = browser.new_context(
record_video_dir="videos/",
**iphone_12,
)
page = context.new_page()
# 下载指定规则的验证码图片
page.route("**/turing.captcha.qcloud.com/hycdn**", handle_captcha)
page.goto(
"https://wap.showstart.com/pages/passport/login/login?redirect=%252Fpages%252FmyHome%252FmyHome")
page.wait_for_timeout(5000)
page.get_by_role("spinbutton").fill("14445104596")
page.get_by_text("获取验证码").click()
frame = page.wait_for_selector("#tcaptcha_iframe")
print(frame.bounding_box())
page.wait_for_timeout(5000)
move_distance = None
for i in range(retryTimes):
print(f"滑块拖动逻辑开始,当前尝试轮数{i + 1}/{retryTimes}")
# 验证码刷新 重新计算距离
if is_reflashed_img or move_distance is None:
distance = calc_distance()
page.wait_for_timeout(200)
true_distance = distance * 353 / 680
move_distance = get_track_list(true_distance)
print(f"获取到相对滑动距离{true_distance}, 模拟拖动列表{move_distance}")
is_reflashed_img = False
drag_result = drag_to_breach(move_distance)
if drag_result:
break
page.wait_for_timeout(3000)
print("识别结束,退出程序")
# input("为方便调试,可启用此代码,避免浏览器关闭")
browser.close()
\ No newline at end of file
......@@ -9,10 +9,11 @@ SOCIAL_MEDIA_TIKTOK = "tiktok"
SOCIAL_MEDIA_BILIBILI = "bilibili"
SOCIAL_MEDIA_KUAISHOU = "kuaishou"
SOCIAL_MEDIA_WANGYIHAO = 'wangyihao'
SOCIAL_MEDIA_TOUTIAO = 'toutiao'
def get_supported_social_media() -> List[str]:
return [SOCIAL_MEDIA_DOUYIN, SOCIAL_MEDIA_TENCENT, SOCIAL_MEDIA_TIKTOK, SOCIAL_MEDIA_KUAISHOU,SOCIAL_MEDIA_WANGYIHAO]
return [SOCIAL_MEDIA_DOUYIN, SOCIAL_MEDIA_TENCENT, SOCIAL_MEDIA_TIKTOK, SOCIAL_MEDIA_KUAISHOU, SOCIAL_MEDIA_WANGYIHAO, SOCIAL_MEDIA_TOUTIAO]
def get_cli_action() -> List[str]:
......
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type