218 lines
6.3 KiB
Python
218 lines
6.3 KiB
Python
#!/usr/bin/env python
|
||
# coding: utf-8
|
||
|
||
from random import randint
|
||
import os, re
|
||
import requests
|
||
from bs4 import BeautifulSoup, Comment
|
||
from .tomd import Tomd
|
||
import requests
|
||
import json
|
||
import re
|
||
import cloudscraper
|
||
import time
|
||
|
||
|
||
def replace_chinese(text, old_chinese, new_chinese):
|
||
# 使用正则表达式匹配中文字符
|
||
pattern = re.compile(re.escape(old_chinese), re.IGNORECASE)
|
||
return pattern.sub(new_chinese, text)
|
||
def result_file(folder_username, file_name, folder_name):
|
||
folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", folder_name, folder_username)
|
||
if not os.path.exists(folder):
|
||
try:
|
||
os.makedirs(folder)
|
||
except Exception:
|
||
pass
|
||
path = os.path.join(folder, file_name)
|
||
file = open(path,"w")
|
||
file.close()
|
||
else:
|
||
path = os.path.join(folder, file_name)
|
||
return path
|
||
|
||
|
||
def get_headers(cookie_path:str):
|
||
cookies = {}
|
||
with open(cookie_path, "r", encoding="utf-8") as f:
|
||
cookie_list = f.readlines()
|
||
for line in cookie_list:
|
||
cookie = line.split(":")
|
||
cookies[cookie[0]] = str(cookie[1]).strip()
|
||
return cookies
|
||
|
||
|
||
def delete_ele(soup:BeautifulSoup, tags:list):
|
||
for ele in tags:
|
||
for useless_tag in soup.select(ele):
|
||
useless_tag.decompose()
|
||
|
||
|
||
def delete_ele_attr(soup:BeautifulSoup, attrs:list):
|
||
for attr in attrs:
|
||
for useless_attr in soup.find_all():
|
||
del useless_attr[attr]
|
||
|
||
|
||
def delete_blank_ele(soup:BeautifulSoup, eles_except:list):
|
||
for useless_attr in soup.find_all():
|
||
try:
|
||
if useless_attr.name not in eles_except and useless_attr.text == "":
|
||
useless_attr.decompose()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
class CSDN(object):
|
||
def __init__(self, username, folder_name, cookie_path):
|
||
self.headers = get_headers(cookie_path)
|
||
self.s = requests.Session()
|
||
self.username = username
|
||
self.TaskQueue = list()
|
||
self.folder_name = folder_name
|
||
self.url_num = 1
|
||
|
||
def start(self):
|
||
num = 0
|
||
articles = [None]
|
||
while len(articles) > 0:
|
||
num += 1
|
||
url = u'https://blog.csdn.net/' + self.username + '/article/list/' + str(num)
|
||
scraper = cloudscraper.create_scraper() # returns a CloudScraper instance
|
||
response = scraper.get(url)
|
||
|
||
# response = self.s.get(url=url, headers=self.headers)
|
||
html = response.text
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
articles = soup.find_all('div', attrs={"class":"article-item-box csdn-tracking-statistics"})
|
||
for article in articles:
|
||
article_title = article.a.text.strip().replace(' ',':')
|
||
article_href = article.a['href']
|
||
self.TaskQueue.append((article_title, article_href))
|
||
|
||
def get_md(self, url):
|
||
scraper = cloudscraper.create_scraper() # returns a CloudScraper instance
|
||
|
||
# response = self.s.get(url=url, headers=self.headers)
|
||
html = scraper.get(url).text
|
||
|
||
soup = BeautifulSoup(html, 'lxml')
|
||
content = soup.select_one("#mainBox > main > div.blog-content-box")
|
||
if(content == None):
|
||
return ""
|
||
print(str(content))
|
||
# 删除注释
|
||
for useless_tag in content(text=lambda text: isinstance(text, Comment)):
|
||
useless_tag.extract()
|
||
# 删除无用标签
|
||
tags = ["svg", "ul", ".hljs-button.signin"]
|
||
delete_ele(content, tags)
|
||
# 删除标签属性
|
||
attrs = ["class", "name", "id", "onclick", "style", "data-token", "rel"]
|
||
delete_ele_attr(content,attrs)
|
||
# 删除空白标签
|
||
eles_except = ["img", "br", "hr"]
|
||
delete_blank_ele(content, eles_except)
|
||
# 转换为markdown
|
||
print(str(content))
|
||
md = Tomd(str(content)).markdown
|
||
return md
|
||
|
||
def write_readme(self):
|
||
print("+"*100)
|
||
print("[++] 开始爬取 {} 的博文 ......".format(self.username))
|
||
print("+"*100)
|
||
reademe_path = result_file(self.username,file_name="README.md",folder_name=self.folder_name)
|
||
with open(reademe_path,'w', encoding='utf-8') as reademe_file:
|
||
readme_head = "# " + self.username + " 的博文\n"
|
||
reademe_file.write(readme_head)
|
||
|
||
self.TaskQueue.reverse()
|
||
for (article_title,article_href) in self.TaskQueue:
|
||
text = str(self.url_num) + '. [' + article_title + ']('+ article_href +')\n'
|
||
reademe_file.write(text)
|
||
|
||
self.url_num += 1
|
||
self.url_num = 1
|
||
|
||
|
||
|
||
def get_all_articles(self):
|
||
while len(self.TaskQueue) > 0:
|
||
(article_title,article_href) = self.TaskQueue.pop()
|
||
time.sleep(randint(10, 25))
|
||
|
||
file_name = re.sub(r'[\/::*?"<>|\n]','-', article_title) + ".md"
|
||
artical_path = result_file(folder_username=self.username, file_name=file_name, folder_name=self.folder_name)
|
||
article_title = article_title.replace('\n',' ')
|
||
article_title = article_title.replace('"',' ')
|
||
article_title = article_title.replace('\'',' ')
|
||
article_title = article_title.replace('\r',' ')
|
||
article_title = article_title.replace('\t',' ')
|
||
|
||
md_head = "# " + article_title + "\n"
|
||
|
||
md = self.get_md(article_href)
|
||
print(md)
|
||
while md == "":
|
||
time.sleep(randint(5, 25))
|
||
md = self.get_md(article_href)
|
||
|
||
md = '[引用自](www.csdn.net)\r\n ' + md_head + md
|
||
print("[++++] 正在处理URL:{}".format(article_href))
|
||
# https://www.testingcloud.club/sapi/api/article_tree
|
||
with open(artical_path, "w", encoding="utf-8") as artical_file:
|
||
artical_file.write(md)
|
||
requests.put("https://www.testingcloud.club/sapi/api/article_tree",json.dumps({
|
||
"title": (article_title),
|
||
"content":(md),
|
||
"spend_time":1,
|
||
"father":2500,
|
||
"level":1,
|
||
"author":"sds",
|
||
"is_public":1,
|
||
"author":"admin"
|
||
}))
|
||
md = md
|
||
self.url_num += 1
|
||
|
||
|
||
|
||
def spider(username: str, cookie_path:str, folder_name: str = "blog"):
|
||
if not os.path.exists(folder_name):
|
||
os.makedirs(folder_name)
|
||
csdn = CSDN(username, folder_name, cookie_path)
|
||
csdn.start()
|
||
csdn.write_readme()
|
||
csdn.get_all_articles()
|
||
|
||
|
||
def onearticle(href: str,cookie:str,folder_name: str = "blog"):
|
||
if not os.path.exists(folder_name):
|
||
os.makedirs(folder_name)
|
||
csdn = CSDN('username', folder_name, cookie)
|
||
md = csdn.get_md(href)
|
||
print(md)
|
||
while md == "":
|
||
time.sleep(randint(5, 25))
|
||
md = csdn.get_md(href)
|
||
|
||
print("[++++] 正在处理URL:{}".format(href))
|
||
# https://www.testingcloud.club/sapi/api/article_tree
|
||
# with open(artical_path, "w", encoding="utf-8") as artical_file:
|
||
# artical_file.write(md)
|
||
# requests.put("https://www.testingcloud.club/sapi/api/article_tree",json.dumps({
|
||
# "title": (article_title),
|
||
# "content":(md),
|
||
# "spend_time":1,
|
||
# "father":2500,
|
||
# "level":1,
|
||
# "author":"sds",
|
||
# "is_public":1,
|
||
# "author":"admin"
|
||
# }))
|
||
# md = md
|
||
# self.url_num += 1
|
||
|
||
|