csdn_spider/csdn/csdn.py

237 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# coding: utf-8
from random import randint
import os, re
import requests
from bs4 import BeautifulSoup, Comment
from .tomd import Tomd
import requests
import json
import re
import cloudscraper
import time
def replace_chinese(text, old_chinese, new_chinese):
# 使用正则表达式匹配中文字符
pattern = re.compile(re.escape(old_chinese), re.IGNORECASE)
return pattern.sub(new_chinese, text)
def result_file(folder_username, file_name, folder_name):
folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", folder_name, folder_username)
if not os.path.exists(folder):
try:
os.makedirs(folder)
except Exception:
pass
path = os.path.join(folder, file_name)
file = open(path,"w")
file.close()
else:
path = os.path.join(folder, file_name)
return path
def get_headers(cookie_path:str):
cookies = {}
with open(cookie_path, "r", encoding="utf-8") as f:
cookie_list = f.readlines()
for line in cookie_list:
cookie = line.split(":")
cookies[cookie[0]] = str(cookie[1]).strip()
return cookies
def delete_ele(soup:BeautifulSoup, tags:list):
for ele in tags:
for useless_tag in soup.select(ele):
useless_tag.decompose()
def delete_ele_attr(soup:BeautifulSoup, attrs:list):
for attr in attrs:
for useless_attr in soup.find_all():
del useless_attr[attr]
def delete_blank_ele(soup:BeautifulSoup, eles_except:list):
for useless_attr in soup.find_all():
try:
if useless_attr.name not in eles_except and useless_attr.text == "":
useless_attr.decompose()
except Exception:
pass
class CSDN(object):
def __init__(self, username, folder_name, cookie_path):
self.headers = get_headers(cookie_path)
self.s = requests.Session()
self.username = username
self.TaskQueue = list()
self.folder_name = folder_name
self.url_num = 1
def start2(self):
num = 0
articles = [None]
while len(articles) > 0:
num += 1
url = u'https://xuesong.blog.csdn.net/category_8454447_' + str(num) + '.html'
print(url)
# url = u'https://' + self.username + u'.blog.csdn.net/article/list/' + str(num)
scraper = cloudscraper.create_scraper() # returns a CloudScraper instance
response = scraper.get(url,headers=self.headers)
# response = self.s.get(url=url, headers=self.headers)
html = response.text
soup = BeautifulSoup(html, "html.parser")
articles = soup.find_all('div', attrs={"class":"column_article_title"})
for article in articles:
article_title = article.text
article_href = article.parent['href']
self.TaskQueue.append((article_title, article_href))
def start(self):
num = 0
articles = [None]
while len(articles) > 0:
num += 1
url = u'https://blog.csdn.net/' + self.username + '/article/list/' + str(num)
scraper = cloudscraper.create_scraper() # returns a CloudScraper instance
response = scraper.get(url,headers=self.headers)
# response = self.s.get(url=url, headers=self.headers)
html = response.text
soup = BeautifulSoup(html, "html.parser")
articles = soup.find_all('div', attrs={"class":"article-item-box csdn-tracking-statistics"})
for article in articles:
article_title = article.a.text.strip().replace(' ','')
article_href = article.a['href']
self.TaskQueue.append((article_title, article_href))
def get_md(self, url):
scraper = cloudscraper.create_scraper() # returns a CloudScraper instance
# response = self.s.get(url=url, headers=self.headers)
html = scraper.get(url,headers=self.headers).text
soup = BeautifulSoup(html, 'lxml')
content = soup.select_one("#mainBox > main > div.blog-content-box")
if(content == None):
return "",soup.title.text
# 删除注释
for useless_tag in content(text=lambda text: isinstance(text, Comment)):
useless_tag.extract()
# 删除无用标签
tags = ["svg", "ul", ".hljs-button.signin"]
delete_ele(content, tags)
# 删除标签属性
attrs = ["class", "name", "id", "onclick", "style", "data-token", "rel"]
delete_ele_attr(content,attrs)
# 删除空白标签
# eles_except = [ "br", "hr"]
# delete_blank_ele(content, eles_except)
# 转换为markdown
md = Tomd(str(content)).markdown
file = open('test.html','w+',encoding='utf-8')
file.write(str(md))
file.close()
return md,soup.title.text
def write_readme(self):
print("+"*100)
print("[++] 开始爬取 {} 的博文 ......".format(self.username))
print("+"*100)
reademe_path = result_file(self.username,file_name="README.md",folder_name=self.folder_name)
with open(reademe_path,'w', encoding='utf-8') as reademe_file:
readme_head = "# " + self.username + " 的博文\n"
reademe_file.write(readme_head)
self.TaskQueue.reverse()
for (article_title,article_href) in self.TaskQueue:
text = str(self.url_num) + '. [' + article_title + ']('+ article_href +')\n'
reademe_file.write(text)
self.url_num += 1
self.url_num = 1
def get_all_articles(self):
while len(self.TaskQueue) > 0:
(article_title,article_href) = self.TaskQueue.pop()
time.sleep(randint(10, 25))
file_name = re.sub(r'[\/:*?"<>|\n]','-', article_title) + ".md"
artical_path = result_file(folder_username=self.username, file_name=file_name, folder_name=self.folder_name)
article_title = article_title.replace('\n',' ')
article_title = article_title.replace('"',' ')
article_title = article_title.replace('\'',' ')
article_title = article_title.replace('\r',' ')
article_title = article_title.replace('\t',' ')
md_head = "# " + article_title + "\n"
md,article = self.get_md(article_href)
while md == "":
time.sleep(randint(5, 25))
md,article = self.get_md(article_href)
md = '[引用自](www.csdn.net)\r\n ' + md_head + md
print("[++++] 正在处理URL{}".format(article_href))
# https://www.testingcloud.club/sapi/api/article_tree
with open(artical_path, "w", encoding="utf-8") as artical_file:
artical_file.write(md)
requests.put("https://www.testingcloud.club/sapi/api/article_tree",json.dumps({
"title": (article_title),
"content":(md),
"spend_time":1,
"father":2500,
"level":1,
"author":"sds",
"is_public":1,
"author":"admin"
}))
md = md
self.url_num += 1
def spider(username: str, cookie_path:str, folder_name: str = "blog"):
if not os.path.exists(folder_name):
os.makedirs(folder_name)
csdn = CSDN(username, folder_name, cookie_path)
csdn.start()
csdn.write_readme()
csdn.get_all_articles()
def onearticle(href: str,cookie:str,folder_name: str = "blog"):
if not os.path.exists(folder_name):
os.makedirs(folder_name)
csdn = CSDN('username', folder_name, cookie)
md,title = csdn.get_md(href)
while md == "":
time.sleep(randint(5, 25))
md = csdn.get_md(href)
print("[++++] 正在处理URL{}".format(href))
# https://www.testingcloud.club/sapi/api/article_tree
requests.put("https://www.testingcloud.club/sapi/api/article_tree",json.dumps({
"title": (title),
"content":(md),
"spend_time":1,
"father":2500,
"level":1,
"author":"sds",
"is_public":1,
"author":"admin"
}))