Source code for bioat.lib.libpatentseq

"""_summary_.

author: Herman Huanan Zhao
email: hermanzhaozzzz@gmail.com
homepage: https://github.com/hermanzhaozzzz

_description_

example 1:
    bioat list
        <in shell>:
            $ bioat list
        <in python consolo>:
            >>> from bioat.cli import Cli
            >>> bioat = Cli()
            >>> bioat.list()
            >>> print(bioat.list())

example 2:
    _example_
"""

import json
import os
import re
import sys
import time

import pandas as pd

from bioat.lib.libpandas import set_option
from bioat.lib.libpath import HOME
from bioat.lib.libspider import ProxyPool
from bioat.logger import LoggerManager

lm = LoggerManager(mod_name="bioat.lib.libpatentseq")

__all__ = ["run"]

STATUS = "RUN"
SEQ_HEADER = None
COOKIE = f"{HOME}/.bioat/LENS.ORG/cookie.json"
CSS_PATH = os.path.join(os.path.dirname(__file__), "patentseq")
IP_ERRORS = (
    "NS_ERROR_UNKNOWN_HOST",
    "NS_ERROR_NET_INTERRUPT",
    "NS_ERROR_PROXY_FORBIDDEN",
)


def _save_cookies(context, log_level):
    lm.set_names(func_name="_save_cookies")
    lm.set_level(log_level)

    # 保存 cookies
    storage_state = context.storage_state()
    lm.logger.info(f"Saving cookie to {COOKIE}")
    os.makedirs(os.path.dirname(COOKIE), exist_ok=True)
    with open(COOKIE, "w") as f:
        f.write(json.dumps(storage_state))
    lm.logger.info(f"Saving cookie time to {COOKIE}.time")
    with open(f"{COOKIE}.time", "w") as f:
        f.write(str(int(time.time())))


def _load_cookies(browser, proxy_ip=None, log_level="DEBUG") -> None | object:
    # 加载 cookies
    lm.set_names(func_name="_load_cookies")
    lm.set_level(log_level)

    try:
        with open(f"{COOKIE}.time") as f:
            login_time = int(f.read().strip())
    except FileNotFoundError:
        login_time = None

    if login_time is None:
        # run login
        return None
    # run check time
    current_time = int(time.time())
    elapsed_time = current_time - login_time
    if elapsed_time > 60 * 60:  # 1 hour
        lm.logger.info("Cookies are expired, performing re-login...")
        return None
    if (
        os.path.exists(COOKIE)
        and os.path.isfile(COOKIE)
        and os.path.getsize(f"{COOKIE}") > 0
    ):
        lm.logger.info("Cookies are still valid, skip login and load cookies")
        with open(COOKIE) as f:
            storage_state = json.loads(f.read().strip())
        lm.logger.debug("new_context")
        return browser.new_context(
            storage_state=storage_state,
            proxy={"server": proxy_ip} if proxy_ip else None,
        )
    return None


def _remove_cookie(log_level):
    lm.set_names(func_name="_remove_cookie")
    lm.set_level(log_level)

    lm.logger.info("Removing cookie")
    try:
        os.remove(COOKIE)
        os.remove(f"{COOKIE}.time")
    except FileNotFoundError:
        lm.logger.warning("Cookies are not found, skip remove.")


[docs] def run( playwright, username, password, seq, seq_header, proxy_server, output, headless, nretry, local_browser, rm_fail_cookie, log_level, ) -> None: lm.set_names(func_name="run") lm.set_level(log_level) try: from playwright._impl._errors import TargetClosedError, TimeoutError from playwright.sync_api import Playwright assert isinstance(playwright, Playwright) except (ImportError, ModuleNotFoundError) as e: lm.set_level("info") lm.logger.info(e) lm.logger.info( "Unable to import playwright. please exec `python -m pip install playwright`, then try again.", ) return except AssertionError as e: lm.logger.info(e) return account = { "username": username, "password": password, } lm.logger.debug(f"Set account: {account}") # headers = { # not change it, for bypassing cloudflare challenge with firefox # 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)' # } # lm.logger.debug(f'Set headers: {headers}') url_login = "https://www.lens.org/lens/bio/" url_query = "https://www.lens.org/lens/bio/patseqfinder" lm.logger.debug(f"set url_login: {url_login}") lm.logger.debug(f"set url_query: {url_query}") data = { "queryString": seq, "database": "patseq-aa", "sequenceType": "AA", "blastTool": "blastp", "numAlignments": "50", "expValue": "0.1", "matrix": "BLOSUM62", "seqLength": f"{len(seq)}", } lm.logger.debug(f"Set data: {data}\n") lm.logger.debug(f"Set headless: {headless}") proxy_ip = None proxy_pool = None if proxy_server: proxy_pool = ProxyPool(url=proxy_server) while True: proxy_ip = proxy_pool.get_proxy().get("proxy") lm.logger.info(f"Set proxy_server: {proxy_server}") if proxy_ip: lm.logger.debug("Get valid proxy, go on") break lm.logger.debug("No proxy found in proxy pool, waiting 10 seconds...") time.sleep(10) lm.logger.debug("Next try to get proxy") continue lm.logger.info(f"Get proxy_ip: {proxy_ip}") # start to test browser = playwright.firefox.launch( headless=headless, executable_path=local_browser if local_browser else None, ) lm.logger.debug("Try to load cookies") context = _load_cookies(browser, proxy_ip, log_level) # context = None if context is None: # need login lm.logger.info("Cookies not exist or out of time. try to login") # login part counter = 0 while True: if counter > nretry: lm.logger.error( f"Could not load url {url_login}, already retry maximum ({nretry})number of retries", ) lm.logger.debug("browser close") browser.close() lm.logger.error("Login failed") sys.exit(1) try: lm.logger.debug("new_context") context = browser.new_context( viewport=( {"width": 1920, "height": 1080} if not headless else None ), proxy={"server": proxy_ip} if proxy_ip else None, ) page = context.new_page() # page.set_extra_http_headers(headers) lm.logger.debug(f"Goto {url_login}") page.goto(url_login) lm.logger.debug('Click "Thanks, Got It') page.get_by_text("Thanks, Got It").click() lm.logger.debug('Click "Sign in') page.get_by_role("link", name="Sign in").click() # lm.logger.debug(f'Try to fill account info: {account}') # page.get_by_label("Email address or Username").click() # page.get_by_label("Email address or Username").fill(account['username']) # page.get_by_label("Email address or Username").press("Tab") # page.get_by_label("Password").fill(account['password']) # add remember me # page.locator("label").filter(has_text="Keep me logged in").locator("i").click() # bypass cloudflare challenge # page.frame_locator("iframe[title=\"Widget containing a Cloudflare security challenge\"]").get_by_label( # "Verify you are human").check() # must sleep for passing cloudflare # time.sleep(2) # lm.logger.debug(f'Try to sign in account') # page.get_by_role("button", name="Sign in").click() with page.expect_popup() as page1_info: lm.logger.debug('Click "SignIn with ORCID"') page.locator("a").filter(has_text="SignIn with ORCID").click() page1 = page1_info.value lm.logger.debug('Click "Proceed"') page1.get_by_role("link", name="Proceed").click() time.sleep(2) lm.logger.debug('Click "Accept All Cookies"') page1.get_by_role("button", name="Accept All Cookies").click() time.sleep(2) lm.logger.debug('Click "Email or 16-digit ORCID iD"') page1.get_by_label("Email or 16-digit ORCID iD").click() lm.logger.debug('Fill "Email or 16-digit ORCID iD"') page1.get_by_label("Email or 16-digit ORCID iD").fill( account["username"], ) lm.logger.debug("Press Tab") page1.get_by_label("Email or 16-digit ORCID iD").press("Tab") lm.logger.debug('Fill "Password"') page1.get_by_label("Password").fill(account["password"]) time.sleep(2) lm.logger.debug('Click "SIGN IN"') page1.get_by_role("button", name="SIGN IN", exact=True).click() lm.logger.debug("Waiting...") time.sleep(20) page1.close() _save_cookies(context, log_level) break # to else for break except TimeoutError as e: lm.logger.debug("context close") context.close() lm.logger.warning(f"Could not load url {url_login}: {e}, retry...") time.sleep(3) counter += 1 continue except Exception as e: if str(e) in IP_ERRORS: lm.logger.exception( f"proxy error: {e}, delete ip ({proxy_ip}) from remote server and retry...", ) proxy_pool.delete_proxy(proxy_ip) proxy_ip = proxy_pool.get_proxy().get("proxy") lm.logger.debug(f"Get new proxy_ip: {proxy_ip} and retry...") lm.logger.debug("context close") context.close() lm.logger.warning(f"Could not load url {url_login}: {e}, retry...") time.sleep(3) counter += 1 continue # now login status is ok page = context.new_page() # page.set_extra_http_headers(headers) lm.logger.debug(f"Goto {url_query}") # query counter = 0 while True: if counter > nretry: lm.logger.error( f"Could not load url {url_query}, already retry maximum ({nretry})number of retries", ) lm.logger.debug("context close") context.close() lm.logger.debug("browser close") browser.close() lm.logger.error("Query failed") sys.exit(1) try: lm.logger.debug("wait until networkidle") page.goto(url_query, wait_until="networkidle") checker1 = page.locator("a.parent", has_text="Signed in as").count() checker2 = ( page.frame_locator("iframe") .get_by_placeholder("Enter a query sequence.") .count() ) if checker1: if checker2: lm.logger.debug("checker1 and checker2 pass!") login_status = True rm_cookie = False else: login_status = False rm_cookie = False lm.logger.error( "checker2 failed, it might because that the proxy IP has been banned!", ) else: login_status = False rm_cookie = True lm.logger.error( "checker1 failed, it might because that the cookies out of time!", ) if not login_status: lm.logger.error( f"Login failed! checker1: {bool(checker1)}, checker2: {bool(checker2)}", ) lm.logger.debug("context close") context.close() lm.logger.debug("browser close") browser.close() lm.logger.error( f"Query failed! checker1: {bool(checker1)}, checker2: {bool(checker2)}", ) if rm_cookie: lm.logger.debug( "Consider to remove cookies because checker1 is failed!", ) if rm_fail_cookie: lm.logger.debug(f"Param rm_fail_cookie = {rm_fail_cookie}") _remove_cookie(log_level) else: lm.logger.debug(f"Param rm_fail_cookie = {rm_fail_cookie}") lm.logger.warning("User set to not remove cookies.") sys.exit(1) else: lm.logger.debug("Passed cookies and success login!") time.sleep(3) page.frame_locator("iframe").get_by_placeholder( "Enter a query sequence.", ).click() lm.logger.debug(f"Try to fill seq: {seq}") page.frame_locator("iframe").get_by_placeholder( "Enter a query sequence.", ).fill(seq) lm.logger.debug('Click "Protein"') page.frame_locator("iframe").get_by_text("Protein", exact=True).click() lm.logger.debug('Click "advanced options"') page.frame_locator("iframe").locator("div").filter( has_text=re.compile(r"^advanced options$"), ).locator("div").click() lm.logger.debug("Set maximum number of hits to 50") page.frame_locator("iframe").get_by_label( "Maximum Number of Hits to", ).select_option("50") lm.logger.debug("Set maximum e-value to 1.0") page.frame_locator("iframe").get_by_label( "Expectation value threshold", ).select_option("1.0") lm.logger.debug("Submit blastp query info") page.frame_locator("iframe").get_by_role( "button", name="Submit search", ).nth(1).click() time.sleep(3) lm.logger.info("Task has been submitted, waiting for completion") break except TimeoutError as e: lm.logger.warning(f"Could not load url {url_query}: {e}, retry...") time.sleep(3) counter += 1 continue except Exception as e: if str(e) in IP_ERRORS: lm.logger.exception( f"proxy error: {e}, delete ip ({proxy_ip}) from remote server and retry...", ) proxy_pool.delete_proxy(proxy_ip) proxy_ip = proxy_pool.get_proxy().get("proxy") lm.logger.debug(f"Get new proxy_ip: {proxy_ip} and retry...") lm.logger.warning(f"Could not load url {url_query}: {e}, retry...") time.sleep(3) counter += 1 continue # set global var to check program status global STATUS global SEQ_HEADER STATUS = "RUN" SEQ_HEADER = seq_header def handle_response(response): global STATUS global SEQ_HEADER if ( "#/results" in full_url and response.request.method == "POST" and response.status == 200 ): try: # 获取响应文本 response_text = response.text() # 尝试解析 JSON,但先确保它不是空的 if response_text: json_data = json.loads(response_text) df = pd.DataFrame.from_dict(json_data["hits"]) set_option(log_level=log_level) lm.logger.info(f"Result hit:\n{df}") if SEQ_HEADER: SEQ_HEADER = SEQ_HEADER.replace(">", "").split(" ")[0] df.insert(0, "queryName", SEQ_HEADER) if output: lm.logger.info(f"Export hit info to {output}") df.to_csv(output, index=False) STATUS = "FINISHED" else: lm.logger.error("Response text is empty") STATUS = "ResponseEmptyError" except json.JSONDecodeError as e: lm.logger.exception(f"Error decoding JSON: {e}") STATUS = "JSONDecodeError" except Exception as e: lm.logger.exception(f"Error while handling response: {e}") STATUS = "ERROR" finally: lm.logger.info(f"STATUS: {STATUS}") lm.logger.info("Task complete.") start_time = time.time() while True: # more than 150 seconds, failed if STATUS != "RUN": lm.logger.info(f"STATUS={STATUS}, SPEND={int(time.time() - start_time)}s") break if int(time.time() - start_time) >= 300: lm.logger.error( "TimeoutError (waiting for more than 300s), " "this might be a problem with browser was terminated external", ) lm.logger.error("You can try again later") sys.exit(1) try: full_url = page.evaluate("window.location.href") # if time.time() - start_time % 5 == 0: # lm.logger.info(f'STATUS={STATUS}, SPEND={int(time.time() - start_time)}s, waiting for completion') # lm.logger.debug(f"Current URL: {page.url}") # lm.logger.debug(f"Full URL including hash: {full_url}") page.on("response", handle_response) except TargetClosedError as e: lm.logger.exception( f"Meet error{e}, this might be a problem with browser was terminated external", ) lm.logger.exception("you can try again later") sys.exit(1) # --------------------- lm.logger.debug("context close") context.close() lm.logger.debug("browser close") browser.close() lm.logger.info("End. Exit.") sys.exit(0)
def query_patent( seq: str, seq_header: str | None = None, username: str | None = None, password: str | None = None, proxy_server: str | None = None, output: str | None = None, headless: bool = True, nretry: int = 3, local_browser: str | None = None, rm_fail_cookie: bool = False, log_level: str = "INFO", ): try: from playwright.sync_api import sync_playwright except (ImportError, ModuleNotFoundError) as e: lm.set_level("info") lm.logger.info(e) lm.logger.info( "Unable to import playwright. please exec `python -m pip install playwright`, then try again.", ) return with sync_playwright() as playwright: run( playwright, username=username, password=password, seq=seq, seq_header=seq_header, proxy_server=proxy_server, output=output, headless=headless, nretry=nretry, local_browser=local_browser, rm_fail_cookie=rm_fail_cookie, log_level=log_level, ) if __name__ == "__main__": query_patent( seq="MEDDKKTTDSIRYELKDKHFWAAFLNLARHNVYITVNHINKILEEGEINRDGY", username="your_account", password="abc123456", proxy_server="http://127.0.0.1:8234", )