from wordpress_xmlrpc import Client, WordPressPost
from wordpress_xmlrpc.methods.posts import GetPosts, NewPost, EditPost
from urllib.parse import urlparse
import frontmatter
import time
import os
from hashlib import md5, sha1
import json
import markdown
import re
import urllib.parse
config_file_txt = ""
if((os.path.exists(os.path.join(os.getcwd(), "diy_config.txt")) == True)):
config_file_txt = os.path.join(os.getcwd(), "diy_config.txt")
else:
config_file_txt = os.path.join(os.getcwd(), "config.txt")
config_info = {}
with open (config_file_txt, 'rb') as f:
config_info = json.loads(f.read())
username = config_info["USERNAME"]
password = config_info["PASSWORD"]
xmlrpc_php = config_info["XMLRPC_PHP"]
try:
if(os.environ["USERNAME"]):
username = os.environ["USERNAME"]
if(os.environ["PASSWORD"]):
password = os.environ["PASSWORD"]
if(os.environ["XMLRPC_PHP"]):
xmlrpc_php = os.environ["XMLRPC_PHP"]
except:
print("无法获取github的secrets配置信息,开始使用本地变量")
url_info = urlparse(xmlrpc_php)
domain_name = url_info.netloc
wp = Client(xmlrpc_php, username, password)
md_dir_name = "chinesebqb-md"
# 获取已发布文章id列表
def get_posts():
print(time.strftime('%Y-%m-%d-%H-%M-%S')+"开始从服务器获取文章列表...")
posts = wp.call(GetPosts({'post_type': 'post', 'number': 1000000000}))
post_link_id_list = []
for post in posts:
post_link_id_list.append({
"id": post.id,
"link": post.link
})
print(post_link_id_list)
print(len(post_link_id_list))
return post_link_id_list
# 创建post对象
def create_post_obj(title, content, link, post_status, terms_names_post_tag, terms_names_category):
post_obj = WordPressPost()
post_obj.title = title
post_obj.content = content
post_obj.link = link
post_obj.post_status = post_status
post_obj.comment_status = "open"
print(post_obj.link)
post_obj.terms_names = {
#文章所属标签,没有则自动创建
'post_tag': terms_names_post_tag,
#文章所属分类,没有则自动创建
'category': terms_names_category
}
return post_obj
# 新建文章
def new_post(title, content, link, post_status, terms_names_post_tag, terms_names_category):
post_obj = create_post_obj(
title = link,
content = content,
link = link,
post_status = post_status,
terms_names_post_tag = terms_names_post_tag,
terms_names_category = terms_names_category)
# 先获取id
id = wp.call(NewPost(post_obj))
# 再通过EditPost更新信息
edit_post(id, title,
content,
link,
post_status,
terms_names_post_tag,
terms_names_category)
# 更新文章
def edit_post(id, title, content, link, post_status, terms_names_post_tag, terms_names_category):
post_obj = create_post_obj(
title,
content,
link,
post_status,
terms_names_post_tag,
terms_names_category)
res = wp.call(EditPost(id, post_obj))
print(res)
# 获取markdown文件中的内容
def read_md(file_path):
content = ""
metadata = {}
with open(file_path) as f:
post = frontmatter.load(f)
content = post.content
metadata = post.metadata
print("==>>", post.content)
print("===>>", post.metadata)
return (content, metadata)
# 获取特定目录的markdown文件列表
def get_md_list(dir_path):
md_list = []
dirs = os.listdir(dir_path)
for i in dirs:
if os.path.splitext(i)[1] == ".md":
md_list.append(os.path.join(dir_path, i))
print(md_list)
return md_list
# 计算sha1
def get_sha1(filename):
sha1_obj = sha1()
with open(filename, 'rb') as f:
sha1_obj.update(f.read())
result = sha1_obj.hexdigest()
print(result)
return result
# 将字典写入文件
def write_dic_info_to_file(dic_info, file):
dic_info_str = json.dumps(dic_info)
file = open(file, 'w')
file.write(dic_info_str)
file.close()
return True
# 将文件读取为字典格式
def read_dic_from_file(file):
file_byte = open(file, 'r')
file_info = file_byte.read()
dic = json.loads(file_info)
file_byte.close()
return dic
# 获取md_sha1_dic
def get_md_sha1_dic(file):
result = {}
if(os.path.exists(file) == True):
result = read_dic_from_file(file)
else:
write_dic_info_to_file({}, file)
return result
# 重建md_sha1_dic,将结果写入.md_sha1
def rebuild_md_sha1_dic(file, md_dir):
md_sha1_dic = {}
md_list = get_md_list(md_dir)
for md in md_list:
key = os.path.basename(md).split(".")[0]
value = get_sha1(md)
md_sha1_dic[key] = {
"hash_value": value,
"file_name": key,
"encode_file_name": urllib.parse.quote(key, safe='').lower()
}
md_sha1_dic["update_time"] = time.strftime('%Y-%m-%d-%H-%M-%S')
write_dic_info_to_file(md_sha1_dic, file)
def post_link_id_list_2_link_id_dic(post_link_id_list):
link_id_dic = {}
for post in post_link_id_list:
link_id_dic[post["link"]] = post["id"]
return link_id_dic
def href_info(link):
return "
\n\n\n\n## 本文永久更新地址: \n[" + link + "](" + link + ")"
# 在README.md中插入信息文章索引信息,更容易获取google的收录
def insert_index_info_in_readme():
# 获取_posts下所有markdown文件
md_list = get_md_list(os.path.join(os.getcwd(), md_dir_name))
# 生成插入列表
insert_info = ""
md_list.sort(reverse=True)
# 读取md_list中的文件标题
for md in md_list:
(content, metadata) = read_md(md)
title = metadata.get("title", "")
insert_info = insert_info + "[" + title +"](" + "https://"+domain_name + "/p/" + os.path.basename(md).split(".")[0] +"/" + ")\n\n"
# 替换 ---start--- 到 ---end--- 之间的内容
insert_info = "---start---\n## 目录(" + time.strftime('%Y年%m月%d日') + "更新)" +"\n" + insert_info + "---end---"
# 获取README.md内容
with open (os.path.join(os.getcwd(), "README.md"), 'r', encoding='utf-8') as f:
readme_md_content = f.read()
print(insert_info)
new_readme_md_content = re.sub(r'---start---(.|\n)*---end---', insert_info, readme_md_content)
with open (os.path.join(os.getcwd(), "README.md"), 'w', encoding='utf-8') as f:
f.write(new_readme_md_content)
print("==new_readme_md_content==>>", new_readme_md_content)
return True
def main():
# 1. 获取网站数据库中已有的文章列表
post_link_id_list = get_posts()
print(post_link_id_list)
link_id_dic = post_link_id_list_2_link_id_dic(post_link_id_list)
print(link_id_dic)
# 2. 获取md_sha1_dic
# 查看目录下是否存在md_sha1.txt,如果存在则读取内容;
# 如果不存在则创建md_sha1.txt,内容初始化为{},并读取其中的内容;
# 将读取的字典内容变量名,设置为 md_sha1_dic
md_sha1_dic = get_md_sha1_dic(os.path.join(os.getcwd(), ".md_sha1"))
# 3. 开始同步
# 读取_posts目录中的md文件列表
md_list = get_md_list(os.path.join(os.getcwd(), md_dir_name))
for md in md_list:
# 计算md文件的sha1值,并与md_sha1_dic做对比
sha1_key = os.path.basename(md).split(".")[0]
sha1_value = get_sha1(md)
# 如果sha1与md_sha1_dic中记录的相同,则打印:XX文件无需同步;
if((sha1_key in md_sha1_dic.keys()) and ("hash_value" in md_sha1_dic[sha1_key]) and (sha1_value == md_sha1_dic[sha1_key]["hash_value"])):
print(md+"无需同步")
# 如果sha1与md_sha1_dic中记录的不同,则开始同步
else:
# 读取md文件信息
(content, metadata) = read_md(md)
# 获取title
title = metadata.get("title", "")
terms_names_post_tag = metadata.get("tags", domain_name)
terms_names_category = metadata.get("categories", domain_name)
post_status = "publish"
link = urllib.parse.quote(sha1_key , safe='').lower()
content = markdown.markdown(content + href_info("https://"+domain_name+"/p/"+link+"/"), extensions=['tables', 'fenced_code'])
# 如果文章无id,则直接新建
if(("https://"+domain_name+"/p/"+link+"/" in link_id_dic.keys()) == False):
new_post(title, content, link, post_status, terms_names_post_tag, terms_names_category)
print("new_post==>>", {
"title": title,
"content": content,
"link": link,
"post_status": post_status,
"terms_names_post_tag": terms_names_post_tag,
"terms_names_category": terms_names_category
});
# 如果文章有id, 则更新文章
else:
# 获取id
id = link_id_dic["https://"+domain_name+"/p/"+link+"/"]
edit_post(id, title, content, link, post_status, terms_names_post_tag, terms_names_category)
print("edit_post==>>", {
"id": id,
"title": title,
"content": content,
"link": link,
"post_status": post_status,
"terms_names_post_tag": terms_names_post_tag,
"terms_names_category": terms_names_category
});
# 4. 重建md_sha1_dic
rebuild_md_sha1_dic(os.path.join(os.getcwd(), ".md_sha1"), os.path.join(os.getcwd(), md_dir_name))
# 5. 将链接信息写入insert_index_info_in_readme
# insert_index_info_in_readme()
main()