#!/usr/bin/env python # coding: utf-8 from random import randint import os, re import requests from bs4 import BeautifulSoup, Comment from .tomd import Tomd import requests import json import re import cloudscraper import time def replace_chinese(text, old_chinese, new_chinese): # 使用正则表达式匹配中文字符 pattern = re.compile(re.escape(old_chinese), re.IGNORECASE) return pattern.sub(new_chinese, text) def replace_chinese(text, old_chinese, new_chinese): # Ensure old_chinese is a string if not isinstance(old_chinese, str): raise ValueError("old_chinese must be a string") try: # 使用正则表达式匹配中文字符 pattern = re.compile(re.escape(old_chinese), re.IGNORECASE) return pattern.sub(new_chinese, text) except re.error as e: print(f"Regex error: {e}") return text def result_file(folder_username, file_name, folder_name): folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", folder_name, folder_username) if not os.path.exists(folder): try: os.makedirs(folder) except Exception: pass path = os.path.join(folder, file_name) file = open(path,"w") file.close() else: path = os.path.join(folder, file_name) return path def get_headers(cookie_path:str): cookies = {} with open(cookie_path, "r", encoding="utf-8") as f: cookie_list = f.readlines() for line in cookie_list: cookie = line.split(":") cookies[cookie[0]] = str(cookie[1]).strip() return cookies def delete_ele(soup:BeautifulSoup, tags:list): for ele in tags: for useless_tag in soup.select(ele): useless_tag.decompose() def delete_ele_attr(soup:BeautifulSoup, attrs:list): for attr in attrs: for useless_attr in soup.find_all(): del useless_attr[attr] """ 删除 BeautifulSoup 对象中所有空白文本的元素,除了指定的例外元素。 参数: soup (BeautifulSoup): 要处理的 BeautifulSoup 对象。 eles_except (list): 一个包含不应删除的元素名称的列表。 返回: 无返回值。直接在原 BeautifulSoup 对象上进行修改。 """ def delete_blank_ele(soup:BeautifulSoup, eles_except:list): for useless_attr in soup.find_all(): try: if useless_attr.name not in eles_except and useless_attr.text == "": useless_attr.decompose() except Exception: pass class CSDN(object): def __init__(self, username, folder_name, cookie_path): self.headers = get_headers(cookie_path) self.s = requests.Session() self.username = username self.TaskQueue = list() self.folder_name = folder_name self.url_num = 1 def start2(self): num = 0 articles = [None] while len(articles) > 0: num += 1 url = u'https://lanceli.blog.csdn.net/?type=blog' print(url) # url = u'https://' + self.username + u'.blog.csdn.net/article/list/' + str(num) scraper = cloudscraper.create_scraper() # returns a CloudScraper instance response = scraper.get(url,headers=self.headers) # response = self.s.get(url=url, headers=self.headers) html = response.text soup = BeautifulSoup(html, "html.parser") articles = soup.find_all('article', attrs={"class":"blog-list-box"}) for article in articles: article_title = article('h4')[0].text article_href = article('a')[0]['href'] self.TaskQueue.append((article_title, article_href)) def start(self): num = 0 articles = [None] while len(articles) > 0: num += 1 url = u'https://blog.csdn.net/' + self.username + '/article/list/' + str(num) scraper = cloudscraper.create_scraper() # returns a CloudScraper instance response = scraper.get(url,headers=self.headers) # response = self.s.get(url=url, headers=self.headers) html = response.text soup = BeautifulSoup(html, "html.parser") articles = soup.find_all('div', attrs={"class":"article-item-box csdn-tracking-statistics"}) for article in articles: article_title = article.a.text.strip().replace(' ',':') article_href = article.a['href'] self.TaskQueue.append((article_title, article_href)) def get_md(self, url): scraper = cloudscraper.create_scraper() # returns a CloudScraper instance # response = self.s.get(url=url, headers=self.headers) html = scraper.get(url,headers=self.headers).text soup = BeautifulSoup(html, 'lxml') content = soup.select_one("#mainBox > main > div.blog-content-box") if(content == None): return "",soup.title.text # 删除注释 for useless_tag in content(text=lambda text: isinstance(text, Comment)): useless_tag.extract() # 删除无用标签 tags = ["svg", "ul", ".hljs-button.signin"] delete_ele(content, tags) # 删除标签属性 attrs = ["class", "name", "id", "onclick", "style", "data-token", "rel"] delete_ele_attr(content,attrs) # 删除空白标签 # eles_except = [ "br", "hr"] # delete_blank_ele(content, eles_except) # 转换为markdown md = Tomd(str(content)).markdown file = open('test.html','w+',encoding='utf-8') file.write(str(md)) file.close() return md,soup.title.text def write_readme(self): print("+"*100) print("[++] 开始爬取 {} 的博文 ......".format(self.username)) print("+"*100) reademe_path = result_file(self.username,file_name="README.md",folder_name=self.folder_name) with open(reademe_path,'w', encoding='utf-8') as reademe_file: readme_head = "# " + self.username + " 的博文\n" reademe_file.write(readme_head) self.TaskQueue.reverse() for (article_title,article_href) in self.TaskQueue: text = str(self.url_num) + '. [' + article_title + ']('+ article_href +')\n' reademe_file.write(text) self.url_num += 1 self.url_num = 1 def get_all_articles(self): while len(self.TaskQueue) > 0: (article_title,article_href) = self.TaskQueue.pop() time.sleep(randint(10, 25)) file_name = re.sub(r'[\/::*?"<>|\n]','-', article_title) + ".md" artical_path = result_file(folder_username=self.username, file_name=file_name, folder_name=self.folder_name) article_title = article_title.replace('\n',' ') article_title = article_title.replace('"',' ') article_title = article_title.replace('\'',' ') article_title = article_title.replace('\r',' ') article_title = article_title.replace('\t',' ') md_head = "# " + article_title + "\n" md,article = self.get_md(article_href) while md == "": time.sleep(randint(5, 25)) md,article = self.get_md(article_href) md = '[引用自](www.csdn.net)\r\n ' + md_head + md print("[++++] 正在处理URL:{}".format(article_href)) # https://www.testingcloud.club/sapi/api/article_tree with open(artical_path, "w", encoding="utf-8") as artical_file: artical_file.write(md) requests.put("https://www.testingcloud.club/sapi/api/article_tree",json.dumps({ "title": (article_title), "content":(md), "spend_time":1, "father":2500, "level":1, "author":"sds", "is_public":1, "author":"admin" })) md = md self.url_num += 1 """ spider函数用于启动CSDN博客爬虫。 参数: username (str): CSDN用户名。 cookie_path (str): 存储CSDN登录cookie的文件路径。 folder_name (str, optional): 保存爬取文章的文件夹名称,默认为"blog"。 功能: 1. 检查并创建用于保存文章的文件夹。 2. 初始化CSDN爬虫对象。 3. 启动爬虫并开始爬取文章。 4. 写入README文件。 5. 获取所有文章信息。 """ def spider(username: str, cookie_path:str, folder_name: str = "blog"): if not os.path.exists(folder_name): os.makedirs(folder_name) csdn = CSDN(username, folder_name, cookie_path) csdn.start() csdn.write_readme() csdn.get_all_articles() def onearticle(href: str,cookie:str,folder_name: str = "blog"): if not os.path.exists(folder_name): os.makedirs(folder_name) csdn = CSDN('username', folder_name, cookie) md,title = csdn.get_md(href) while md == "": time.sleep(randint(5, 25)) md = csdn.get_md(href) print("[++++] 正在处理URL:{}".format(href)) # https://www.testingcloud.club/sapi/api/article_tree requests.put("https://www.testingcloud.club/sapi/api/article_tree",json.dumps({ "title": (title), "content":(md), "spend_time":1, "father":2500, "level":1, "author":"sds", "is_public":1, "author":"admin" })) def onearticlewith(href: str,cookie:str,father: int = 2500,folder_name: str = "blog"): if not os.path.exists(folder_name): os.makedirs(folder_name) csdn = CSDN('username', folder_name, cookie) md,title = csdn.get_md(href) while md == "": time.sleep(randint(5, 25)) md = csdn.get_md(href) print("[++++] 正在处理URL:{}".format(href)) # https://www.testingcloud.club/sapi/api/article_tree requests.put("https://www.testingcloud.club/sapi/api/article_tree",json.dumps({ "title": (title), "content":(md), "spend_time":1, "father":father, "level":1, "author":"sds", "is_public":1, "author":"admin" }))