main.py 6.61 KB
# -*- coding: utf-8 -*-
from datetime import datetime

from playwright.async_api import Playwright, async_playwright
import os
import asyncio
import json
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import xhs_logger


async def write_cookies_to_file(cookies, file_path):

    with open(file_path, 'w') as f:
        json.dump(cookies, f)


async def read_cookies_from_file(file_path):
    try:
        with open(file_path, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return []

async def cookie_auth(account_file):
    print(account_file)
    async with async_playwright() as playwright:
        browser = await playwright.chromium.launch(headless=True)
        # context = await browser.new_context(storage_state=account_file)
        context = await browser.new_context()
        page = await context.new_page()
        saved_cookies = await read_cookies_from_file(account_file)
        await context.add_cookies(saved_cookies)
        await page.goto("https://www.xiaohongshu.com")
        try:
            print(await page.title())
            await page.wait_for_selector("div.link-wrapper a.link-wrapper span.channel", timeout=5000)  # 等待5秒
            xhs_logger.success("[+] cookie 有效")
            return True
        except:
            xhs_logger.info("[+] 等待5秒 cookie 失效")
            return False


async def xhs_setup(account_file, handle=False):
    account_file = get_absolute_path(account_file, "xhs_uploader")
    if not os.path.exists(account_file) or not await cookie_auth(account_file):
        if not handle:
            return False
        xhs_logger.info('[+] cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录,登陆后会自动生成cookie文件')
        await get_xhs_cookie(account_file)
    return True


async def get_xhs_cookie(account_file):
    async with async_playwright() as playwright:
        options = {
            'args': [
                '--lang en-GB'
            ],
            'headless': False,  # Set headless option here
        }
        # Make sure to run headed.
        browser = await playwright.chromium.launch(**options)
        # Setup context however you like.
        context = await browser.new_context()  # Pass any options
        context = await set_init_script(context)
        # Pause the page, and start recording manually.
        page = await context.new_page()
        await page.goto("https://www.xiaohongshu.com")
        await asyncio.sleep(1)
        await page.wait_for_selector("div.link-wrapper a.link-wrapper span.channel", timeout=50000)
        print("小红书登录成功")
        await asyncio.sleep(3)

        # await page.pause()
        # 点击调试器的继续,保存cookie
        await asyncio.sleep(3)
        cookies = await context.cookies()
        await write_cookies_to_file(cookies, account_file)


class XHSVideo(object):
    def __init__(self, title, file_path, tags, publish_date: datetime, account_file):
        self.title = title  # 视频标题
        self.file_path = file_path
        self.tags = tags
        self.publish_date = publish_date
        self.account_file = account_file
        self.date_format = '%Y-%m-%d %H:%M'
        self.local_executable_path = LOCAL_CHROME_PATH

    async def handle_upload_error(self, page):
        xhs_logger.error("视频出错了,重新上传中")
        await page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(self.file_path)

    async def upload(self, playwright: Playwright) -> None:
        # 使用 Chromium 浏览器启动一个浏览器实例
        if self.local_executable_path:
            browser = await playwright.chromium.launch(
                headless=False,
                executable_path=self.local_executable_path,
            )
        else:
            browser = await playwright.chromium.launch(
                headless=True
            )  # 创建一个浏览器上下文,使用指定的 cookie 文件
        context = await browser.new_context()
        page = await context.new_page()
        saved_cookies = await read_cookies_from_file(self.account_file)
        await context.add_cookies(saved_cookies)
        # context = await set_init_script(context)
        # context.on("close", lambda: )

        # 创建一个新的页面
        # 访问指定的 URL
        await page.goto("https://creator.xiaohongshu.com/publish/publish?source=official")
        xhs_logger.info('正在上传-------{}.mp4'.format(self.title))
        # 等待页面跳转到指定的 URL,没进入,则自动等待到超时
        # 点击 "上传视频" 按钮

        await page.locator("input.upload-input").set_input_files(
            self.file_path)

        upload_button = page.locator('button div span:has-text("发布")')
        await upload_button.wait_for(state='visible')  # 确保按钮可见


        xhs_logger.info("正在填充标题和话题...")
        await page.locator("div.titleInput div div input").fill("花花")

        inputTag = page.locator('id=quillEditor').locator('div p')
        await inputTag.click()
        for index, tag in enumerate(self.tags, start=1):
            await page.keyboard.type(f"#{tag} ")
            await asyncio.sleep(2)

        xhs_logger.info("已写完tag")
        await asyncio.sleep(1)

        # 定时任务
        if self.publish_date != 0:
            await self.set_schedule_time(page, self.publish_date)
            upload_button = page.locator('button div span:has-text("定时发布")')

        await upload_button.click()
        await page.wait_for_selector('p:has-text("发布成功")')
        await write_cookies_to_file(await context.cookies(), self.account_file)
        xhs_logger.info('cookie更新完毕!')
        await asyncio.sleep(2)  # 这里延迟是为了方便眼睛直观的观看
        # 关闭浏览器上下文和浏览器实例
        await context.close()
        await browser.close()

    async def main(self):
        async with async_playwright() as playwright:
            await self.upload(playwright)

    async def set_schedule_time(self, page, publish_date):
        xhs_logger.info("click schedule")
        publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M:%S")
        await page.locator("span.el-radio__input").nth(1).click()
        await asyncio.sleep(1)

        await page.locator('div.date-picker div div input').click()
        await asyncio.sleep(1)

        await page.keyboard.press("Control+KeyA")
        await page.keyboard.type(str(publish_date_hour))
        await page.keyboard.press("Enter")
        await asyncio.sleep(1)