Skip to main content
Python 装饰模式
import time


def retry(retry_times: int = 3):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for i in range(retry_times):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"{str(func).split(' ')[1]} retry {i + 1} times cause exception {e}")
                    if i == retry_times - 1:
                        raise e

        return wrapper

    return decorator


def timer():
    def decorator(func):
        def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                return func(*args, **kwargs)
            except Exception as e:
                raise e
            finally:
                end = time.perf_counter()
                print(f"{func.__name__} cost {end - start} seconds")

        return wrapper

    return decorator


MarshioLess than 1 minutepython
Python 关键字 -- `global`

key words

用法

# 预先全局声明
current_date = datetime.datetime.now().strftime('%Y-%m-%d')

def a():
    global current_date
    current_date = '2024-09-05'
    
if __name__ == '__main__':
    print(current_date)
    # 输出 2024-09-03
    a()
    print(current_date)
    # 输出 2024-09-05

MarshioLess than 1 minutepython
Python MySQL 连接池
import pymysql
from dbutils.pooled_db import PooledDB

from config.mysql import DATABASE_CONFIG


# 创建数据库连接池
class MysqlPool(object):
    __pool = None

    def __enter__(self):
        self.conn = self.get_conn()
        self.cursor = self.conn.cursor()
        return self.conn, self.cursor

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()

    @classmethod
    def get_conn(cls):
        if cls.__pool is None:
            cls.__pool = PooledDB(
                # 使用链接数据库的模块
                creator=pymysql,
                # 连接池允许的最大连接数,可以根据情况调整
                maxconnections=5,
                # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
                mincached=5,
                # 链接池中最多闲置的链接,0和None不限制
                # 链接池中最多共享的链接数量,0和None表示全部共享。
                maxcached=5,
                # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,
                # 所有值无论设置为多少,maxcached永远为0,所以永远是所有链接都共享。
                maxshared=3,
                # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                blocking=True,
                # 一个链接最多被重复使用的次数,None表示无限制
                maxusage=None,
                # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
                setsession=[],
                # ping MySQL服务端,检查是否服务可用。
                # 如:0 = None = never,
                # 1 = default = whenever it is requested,
                # 2 = when a cursor is created,
                # 4 = when a query is executed,
                # 7 = always
                ping=0,
                **DATABASE_CONFIG
            )
        return cls.__pool.connection()

# if __name__ == '__main__':
#     with MysqlPool() as (conn, cursor):
#         cursor.execute("select * from sina_notices limit 10")
#         result = cursor.fetchall()
#         print(result)
#         if result:
#             conn.commit()
#         else:
#             conn.rollback()


MarshioAbout 1 minpython
ModuleNotFoundError No module named '_ctypes'

背景

在 Linux 上运行 celery,执行 celery -A _celery worker -l INFO 的时候报错

/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
 
 -------------- celery@DESKTOP-I2O884V v5.4.0 (opalescent)
--- ***** ----- 
-- ******* ---- Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.34 2024-07-18 10:32:43
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         spiders:0x7f0aa7fda520
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . _celery.tasks.add
  . _celery.tasks.multiplication
  . _celery.tasks.xsum

[2024-07-18 10:32:43,612: CRITICAL/MainProcess] Unrecoverable error: ModuleNotFoundError("No module named '_ctypes'")
Traceback (most recent call last):
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/worker/worker.py", line 202, in start
    self.blueprint.start(self)
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/concurrency/base.py", line 130, in start
    self.on_start()
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/concurrency/prefork.py", line 109, in on_start
    P = self._pool = Pool(processes=self.limit,
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/concurrency/asynpool.py", line 464, in __init__
    super().__init__(processes, *args, **kwargs)
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/billiard/pool.py", line 1045, in __init__
    self._create_worker_process(i)
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/celery/concurrency/asynpool.py", line 482, in _create_worker_process
    return super()._create_worker_process(i)
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/billiard/pool.py", line 1141, in _create_worker_process
    on_ready_counter = self._ctx.Value('i')
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/billiard/context.py", line 177, in Value
    from .sharedctypes import Value
  File "/root/.pyenv/versions/3.8.0/envs/python-3.8.0/lib/python3.8/site-packages/billiard/sharedctypes.py", line 10, in <module>
    import ctypes
  File "/root/.pyenv/versions/3.8.0/lib/python3.8/ctypes/__init__.py", line 7, in <module>
    from _ctypes import Union, Structure, Array
ModuleNotFoundError: No module named '_ctypes'

MarshioAbout 1 minpythonpython小技巧
not enough values to unpack (expected 3, got 0)

背景

在 Windows 上运行 celery,执行 res.get(timeout=10) 的时候报错

res.get(timeout=10)
Traceback (most recent call last):
  File "C:\Apps\JetBrains\PyCharm 2023.3.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 1, in <module>
  File "C:\Tools\miniconda3\envs\python3.8\lib\site-packages\celery\result.py", line 251, in get
    return self.backend.wait_for_pending(
  File "C:\Tools\miniconda3\envs\python3.8\lib\site-packages\celery\backends\asynchronous.py", line 223, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "C:\Tools\miniconda3\envs\python3.8\lib\site-packages\celery\result.py", line 365, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "C:\Tools\miniconda3\envs\python3.8\lib\site-packages\celery\result.py", line 358, in throw
    self.on_ready.throw(*args, **kwargs)
  File "C:\Tools\miniconda3\envs\python3.8\lib\site-packages\vine\promises.py", line 235, in throw
    reraise(type(exc), exc, tb)
  File "C:\Tools\miniconda3\envs\python3.8\lib\site-packages\vine\utils.py", line 27, in reraise
    raise value
ValueError: not enough values to unpack (expected 3, got 0)

MarshioLess than 1 minutepythonpython小技巧
Python Celery

简介

不多说,直接开干

下载

pipy 获取下载链接。

pip install celery==5.4.0

MarshioLess than 1 minutepythoncelery
ModuleNotFoundError

背景:想要使用命令行执行 python 脚本,但是得到了 ModuleNotFoundError: No module named 'xxx'

项目结构

py-spider
└── util
    ├── request.py
    ├── mysql_pool.py
    ├── logger.py
    └── __init__.py
└── spider
    ├── spider_demo.py
    └── __init__.py
└── main.py

MarshioAbout 4 minpythonpython小技巧
Python 多版本管理工具

管理多个Python版本和它们的虚拟环境对于任何需要在不同项目间切换的开发者来说都是一项基础技能。从官方的 venv 到强大的 pyenv 和其他第三方工具,Python社区提供了一系列的工具来简化这一过程。本文将为你提供一个全面(未来会全的)的指南,帮助你掌握这些工具的使用方法。

conda

venv

venv 是 python 官方在 python 3.3 版本内置的一个标准库模块,用于创建虚拟环境,帮助用户快速创建干净、完全隔离的且不同版本的 python 解释器以便在不同项目中开发。

venv 本身不提供 python 版本的创建,而是直接依赖服务器的 python ,如果想要创建其他版本的 python,需要选择其他版本管理。


MarshioAbout 5 minpythonspider
Python Spider

以下是我的一些爬虫备份

东财

import datetime
import requests
from loguru import logger
from bs4 import BeautifulSoup
from apscheduler.schedulers.blocking import BlockingScheduler

headers = {
    'Accept': '*/*',
    'Accept-Language': 'zh-CN,zh;q=0.9,en-GB;q=0.8,en-US;q=0.7,en;q=0.6',
    'Connection': 'keep-alive',
    'Referer': 'https://data.eastmoney.com/report/industry.jshtml',
    'Sec-Fetch-Dest': 'script',
    'Sec-Fetch-Mode': 'no-cors',
    'Sec-Fetch-Site': 'same-site',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 '
                  'Safari/537.36',
    'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
    'sec-ch-ua-mobile': '?0',
    'sec-ch-ua-platform': '"Windows"',
    'Host': 'reportapi.eastmoney.com'
}

records = {
    'day': '',
    '个股研报': 0,
    '行业研报': 0,
    '策略报告': 0,
    '宏观研报': 0,
}


def report():
    return {
        'id': 0,
        'category': '',
        'industry_code': '',
        'industry_name': '',
        'stock_code': '',
        'stock_name': '',
        'publish_date': '',
        'title': '',
        'summary': '',
        'pdf_url': '',
        'author': '',
        'org_code': '',
        'org_name': '',
        'org_s_name': '',
        'em_rating_name': '',
        'url': '',
        'seed_url': '',
        # 个股行业名称
        'indv_indu_name': '',
        # 个股行业code
        'indv_indu_code': '',
    }


def fetch_eastmoney_report(url, category, begin, end, page_no):
    # 请求数据
    response = requests.get(url.format(begin, end, page_no), headers)
    report_list = []
    total_page = response.json()['TotalPage']
    hits = response.json()['hits']
    record = {
        category: hits
    }
    records.update(record)
    logger.info(
        '当前url:{},页数:{},当前页码:{},数量:{}'.format(url.format(begin, end, page_no), total_page, page_no, hits))
    for data in response.json()['data']:
        report_info = report()
        report_info['category'] = category
        report_info['title'] = data['title']
        report_info['seed_url'] = url
        # 个股code
        report_info['stock_code'] = data['stockCode']
        # 个股名称
        report_info['stock_name'] = data['stockName']
        # 个股code
        report_info['industry_code'] = data['industryCode']
        # 个股名称
        report_info['industry_name'] = data['industryName']
        # 二次跳转关键字段
        report_info['info_code'] = data['infoCode']
        # 当前评级
        report_info['em_rating_name'] = data['emRatingName']
        # 作者,作者有多个字段,researcher这个字段是作者的string形式
        report_info['author'] = data['researcher']
        # 机构代码
        report_info['org_code'] = data['orgCode']
        # 机构全称
        report_info['org_name'] = data['orgName']
        report_info['indv_indu_name'] = data['indvInduName']
        report_info['indv_indu_code'] = data['indvInduCode']
        # 机构简称
        report_info['org_s_name'] = data['orgSName']
        # 发布日期
        report_info['publish_date'] = data['publishDate']
        report_list.append(report_info)
    # 释放
    response.close()
    # 进入详情页面爬取信息
    for report_info in report_list:
        # 详情页面
        report_detail_uri = 'https://data.eastmoney.com/report/zw_industry.jshtml?infocode={}' \
            .format(report_info['info_code'])
        detail_html = requests.get(report_detail_uri, headers)

        soup = BeautifulSoup(detail_html.text, 'lxml')
        detail_html.close()
        pdf_url_a = soup.select('a.pdf-link')[0]
        summary = soup.select('div.ctx-content')[0]
        summary_text = summary.get_text()
        pdf_url = pdf_url_a.get('href')
        report_info['url'] = report_detail_uri
        report_info['pdf_url'] = pdf_url
        report_info['summary'] = summary_text
    if page_no < total_page:
        fetch_eastmoney_report(url, category, begin, end, page_no=page_no + 1)


def fetch_eastmoney_jg_report(url, category, begin, end, page_no):
    # 请求数据
    response = requests.get(url.format(begin, end, page_no), headers)
    report_list = []
    total_page = response.json()['TotalPage']
    hits = response.json()['hits']
    logger.info(
        '当前url:{},页数:{},当前页码:{},数量:{}'.format(url.format(begin, end, page_no), total_page, page_no, hits))
    for data in response.json()['data']:
        report_info = report()
        report_info['category'] = category
        report_info['title'] = data['title']
        report_info['seed_url'] = url

        # 二次跳转关键字段
        report_info['encode_url'] = data['encodeUrl']
        # 作者,作者有多个字段,researcher这个字段是作者的string形式
        report_info['author'] = data['researcher']
        # 机构代码
        report_info['org_code'] = data['orgCode']
        # 机构全称
        report_info['org_name'] = data['orgName']
        # 机构简称
        report_info['org_s_name'] = data['orgSName']
        # 发布日期
        report_info['publish_date'] = data['publishDate']

        report_list.append(report_info)
    response.close()
    # 进入详情页面爬取信息
    for report_info in report_list:
        # 详情页面
        report_detail_uri = 'https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl={}' \
            .format(report_info['encode_url'])
        detail_html = requests.get(report_detail_uri, headers)
        soup = BeautifulSoup(detail_html.text, 'lxml')
        detail_html.close()
        pdf_url_a = soup.select('a.pdf-link')[0]
        summary = soup.select('div.ctx-content')[0]
        pdf_url = pdf_url_a.get('href')
        summary_text = summary.get_text()
        if summary_text.rfind('风险提示'):
            summary_text = summary_text[0:summary_text.find('风险提示')]
        if summary_text.rfind('风险因素'):
            summary_text = summary_text[0:summary_text.rfind('风险因素')]
        report_info['url'] = report_detail_uri
        report_info['pdf_url'] = pdf_url
        report_info['summary'] = summary_text
    if page_no < total_page:
        fetch_eastmoney_jg_report(url, category, begin, end, page_no=page_no + 1)
    pass


def eastmoney_spider():
    now = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
    today = datetime.datetime.today().strftime('%Y-%m-%d')
    logger.info("定时任务执行 at {}".format(now))
    report_uris = {
        # 行业研报,qType=1代表是行业研报
        '个股研报': 'https://reportapi.eastmoney.com/report/list?pageSize=100&beginTime={}&endTime={}&pageNo={}&qType=0',
        '行业研报': 'https://reportapi.eastmoney.com/report/list?pageSize=100&beginTime={}&endTime={}&pageNo={}&qType=1',
    }

    jg_uris = {
        '策略报告': 'https://reportapi.eastmoney.com/report/jg?pageSize=100&beginTime={}&endTime={}&pageNo={}&qType=2',
        '宏观研报': 'https://reportapi.eastmoney.com/report/jg?pageSize=100&beginTime={}&endTime={}&pageNo={}&qType=3',
    }

    for k, v in report_uris.items():
        fetch_eastmoney_report(v, k, today, today, 1)

    for k, v in jg_uris.items():
        fetch_eastmoney_jg_report(v, k, today, today, 1)


if __name__ == '__main__':
    eastmoney_spider()
    scheduler = BlockingScheduler()
    scheduler.add_job(eastmoney_spider, "cron", day='*', hour='*', minute='0')
    scheduler.start()


MarshioAbout 3 minpythonspider