要使用python对wordpress网站进行批量文章发布,最简单的方法是使用Python xmlrpc,详细用法可以见这里:http://www.snailtoday.com/archives/13599
不过,有时候碰到一些特殊的需求,使用Python xmlrpc无法满足我们的需求,这时需要直接操作wordpress数据库。
它需要牵涉到wordrpess的下面四张表,

整个逻辑学是这样的:
1.wp_posts生成文章的ID,
2.wp_terms这个表中生成term_id
3.将term_id写入到wp_term_taxonomy这个表中,获得term_taxonomy_id,如果原来的term_id已经在这个表中,那么就不要插入,直接查询它的term_taxonomy_id就可以了。
4.将文章id,term_taxonomy_id写入到wp_term_relationships这张表中,实现文章与标签的连接,一个文章id可以对应多个term_taxonomy_id。
参考代码:
# coding:utf-8
"""
这个脚本本地测试用
"""
import pymysql
import datetime,time,re,sys,random
import requests
import json
import traceback
from google_translate import Translate
from my_logging import Mylogging
from slugify import slugify #生成slug
mylog = Mylogging()
con = pymysql.connect(host='localhost',port=3306, user='root', password='', database='stackover_for_test', charset='utf8')
cursor = con.cursor()
con_server = pymysql.connect(host='localhost',port=3306, user='wp04', password='', database='wp04', charset='utf8')
cursor_server = con_server.cursor()
import sqlite3
#记录写入的数据
connection=sqlite3.connect('stackoverflow.db')
print('Opened database successfully')
cur = connection.cursor()
cur.execute('CREATE TABLE IF NOT EXISTS posted_to_db(id INTEGER PRIMARY KEY,posted_ID INT,post_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL)') #插入有逗号的内容,用两个单引号替换一个单引号
print('Table created successfully')
connection.commit()
updatetime=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
def sql_w(sql,val):
#向数据库写入数据,并返回最后一条数据记录的id。
cur = con_server.cursor()
try:
cur.execute(sql,val)
tag_id = cur.lastrowid
con_server.commit()
except Exception as e:
con_server.rollback()
mylog.write_log("写入数据库报错Error!!!,报错信息为{}".format(e))
tag_id = 0
mylog.write_log(e)
mylog.write_log("报错的SQL语句为:{}".format(sql))
# sys.exit()
return tag_id
def sql_r_num(sql):
#查询一条数据,并返回这条数据。
cur = con.cursor()
cur.execute(sql)
data = cur.fetchone()
return data[0]
def get_data_from_db(stackoverflow_id):
#通过ID从数据库中找到问题的相关数据
cursor.execute('''select * from Posts where id=%s and PostTypeId = 1''' % (stackoverflow_id)) #PostTypeId = 1 代表问题
results = cursor.fetchall()
if len(results) > 0:
return results
else:
return None
def get_answers_from_db(stackoverflow_id):
#通过问题的id,在post表中找到答案的那些记录,并返回。
cursor.execute('''select * from Posts where ParentId = %s''' % (stackoverflow_id))
answer_results = cursor.fetchall()
return answer_results
def split_tags(tags):
#正则拆分tags,将“<svn><tortoisesvn><branch>” 变成['svn','branch']并返回。
req = r'<(.*?)>' #括号内的是要抓取的内容。如果没找到,返回[]
content = re.findall(req,tags) #不加[0]是列表
if len(content) >0:
return content
else:
return None
def get_tag_result(taglist):
"""插入tgas,并返回tags的id"""
tag_result_list = []
for tag in taglist:
tag_id = insert_tag(tag)
if tag_id != 0:
tag_result_list.append(tag_id)
# print("success!")
return tag_result_list
def get_code(body):
#利用正则获取文章中的代码,并存入到一个列表中。
#将文章中的代码替换为<code>1</code>的形式.
req_code = r'<code>.*?</code>'
req_code = re.compile(req_code,re.S) #多行匹配
mycode = re.findall(req_code,body)#不加[0]是列表
# my_dict = {}
num = 1
my_list = []
for i in mycode:
new_code = "<code>{}</code>".format(num)
my_list.append(i)
body = body.replace(i,new_code)
num+=1
return my_list, body
def recover(my_list,body):
#将翻译后的文章,代码替换回来
num = 1
for i in my_list:
new_code = "<code>{}</code>".format(num)
body = body.replace(new_code,i)
num += 1
return body
def insert_tag(tag):
#先判断ask_tags表中是否已存在tag,不存在则将tag插入到表中并返回插入后的ID,存在则返回tag的ID.
cursor_server.execute("select * from wp_terms where name = '%s'" %(tag))
results = cursor_server.fetchone()
if results == None:
mylog.write_log('数据库中没有此tag的记录')
sql_users = '''INSERT INTO wp_terms (name,slug,term_group) VALUES ("%s","%s",%s)''' % (
tag,#name
tag,#slug
0,#term_group
)
print("xxxxxxxxxxx",sql_users)
try:
cursor_server.execute(sql_users)
tag_id = cursor_server.lastrowid
con_server.commit()
mylog.write_log('tag写入数据库成功!ID为{}'.format(tag_id))
except Exception as e:
con_server.rollback()
traceback.print_exc()
mylog.write_log('tag写入数据库失败!内容为{}'.format(e))
tag_id = 0
return tag_id
else:
mylog.write_log("数据库中已经有此tag的记录,id为{}".format(results[0]))
return results[0]
def insert_wp_term_relationships(articleid,tagid):
#将文章id,tagid写入到wp_term_relationships关联表
for tag in tagid:
sql ="INSERT INTO wp_term_relationships(object_id,term_taxonomy_id,term_order) VALUES (%s,%s,%s)"%(articleid,tag,0) #最前面的引号要变成双引号,ignore表示忽略重复数据,不过先要设定索引
print("----------",sql)
try:
cursor_server.execute(sql)
tag_id = cursor_server.lastrowid
con_server.commit()
mylog.write_log('articleid为{}, tagid为{},写入数据库成功!ID为{}'.format(articleid,tag,tag_id))
except:
cursor_server.rollback()
traceback.print_exc()
mylog.write_log('articleid, tagid写入数据库失败!')
tag_id = 0
def insert_wp_term_taxonomy(tagid):
#将tagid写入到wp_term_taxonomy表
my_list = []
for tag in tagid:
#先判断wp_term_taxonomy表中是否已存在tag,不存在则将tag插入到表中并返回插入后的ID,存在则返回tag的ID.
cursor_server.execute("select * from wp_term_taxonomy where term_id = '%s'" %(tag))
results = cursor_server.fetchone()
if results == None:
mylog.write_log('数据库中没有此tag的记录')
sql ="INSERT INTO wp_term_taxonomy(term_id,taxonomy,description,parent,count) VALUES (%s,'%s','','0','1')"%(tag,'question_tag') #最前面的引号要变成双引号,ignore表示忽略重复数据,不过先要设定索引
print("----------",sql)
try:
cursor_server.execute(sql)
tag_id = cursor_server.lastrowid
con_server.commit()
my_list.append(tag_id)
mylog.write_log('term_id{},写入数据库成功!ID为{}'.format(tag,tag_id))
except:
cursor_server.rollback()
traceback.print_exc()
mylog.write_log('tagid, tagid写入数据库失败!')
tag_id = 0
return my_list
def get_id_from_db(question_id):
#查看数据库中是否已经存在url
cur.execute('''select * from posted_to_db where posted_ID='%s' ''' % (question_id))
results = cur.fetchall()
if len(results) > 0:
return results
else:
return None
def insert_result_to_db(question_id):
#将发布的问题的id写入本地数据库,备查。
sql ="INSERT INTO posted_to_db(posted_ID) VALUES (%s)" % (question_id)
# cur = con.cursor()
try:
cur.execute(sql)
tag_id = cur.lastrowid
connection.commit()
mylog.write_log("将发布成功后的问题id插入本地数据库成功,问题ID为{}".format(question_id))
except Exception as e:
connection.rollback()
mylog.write_log('id为{}写入数据库失败!'.format(question_id))
mylog.write_log("写入数据库报错Error!!!")
mylog.write_log(e)
mylog.write_log("报错的SQL语句为:{}".format(sql))
tag_id = 0
# sys.exit()
return tag_id
def insert_question(db):
#将问题写入数据库
for row in db:
answerCount =row[2]
body = row[3]
OwnerUserId = row[13] #作者ID
tags = row[17]
tags = split_tags(tags) #转化成列表
tag_id_result = get_tag_result(tags)
mylog.write_log("文章的tags为{}".format(tags))
body =body.replace('"','\'')
body =body.replace("<blockquote>","<blockquote class ='notranslate'>")
my_list = get_code(body)[0]
body = get_code(body)[1]
t1 = Translate(body)
body = t1.get_chinese_conetent()
body = body.replace("</ a>","</a>")
time.sleep(5)
body = recover(my_list,body)
title = row[18]
title = title.replace("('","")
title = title.replace(",)","")
# title = title.replace('"','\''),#title
mylog.write_log("原标题是:{}".format(title))
slug = slugify(title)
t2 = Translate(title)
title = t2.get_chinese_conetent()
mylog.write_log("新标题是:{}".format(title))
answer_results = get_answers_from_db(stackoverflow_id)
num = 1
if answer_results:
final_answer = ""
for row in answer_results:
if num < 11:
answerCount =row[2]
answer = row[3]
my_list = get_code(answer)[0] #将文章中的代码块取出来
answer = get_code(answer)[1] #将文章中的body取出来
t3 = Translate(answer)
answer = t3.get_chinese_conetent() #将body内容翻译成中文
answer = answer.replace("</ a>","</a>")
if len(answer) > 0:
answer = recover(my_list,answer) #如果调用翻译接口没有出错,有返回结果,则将body内容中的代码还原回去。
body = body + "<h5>answer</h5>" + answer
print("新建的回答内容是:",body)
print("*"*180)
time.sleep(35)
num +=1
mylog.write_log("答案获取完毕!")
mylog.write_log("-"*80)
# else:
if title is not None:
sql_ask_questions = '''INSERT INTO wp_posts (post_author,post_date,post_date_gmt,post_content,post_title,post_excerpt,post_status,comment_status,ping_status,post_password,post_name,to_ping,pinged,post_modified,post_modified_gmt,post_content_filtered,post_parent,guid,menu_order,post_type,post_mime_type,comment_count) VALUES (%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
val = (
1,#post_author
str(updatetime),#post_date
str(updatetime),#post_date_gmt
str(body),#post_content
str(title),#post_title
'',#post_excerpt
'publish',#post_status
'open',#comment_status
'open',#ping_status
'',#post_password
str(slug),#post_name
'',#to_ping
'',#pinged
str(updatetime),#post_modified
str(updatetime),#post_modified_gmt
'',#post_content_filtered
0,#post_parent
'http://localhost/wp04/questions/question//',#guid
0,#menu_order
'question',#post_type
'',#post_mime_type
0,#comment_count
)
question_id = sql_w(sql_ask_questions,val) #写入问题
print("question id is ..............",question_id)
if question_id == 0:
print("questio id 为0,不插入啦!!!!!!!!!!!!!!!------------------------------")
else:
term_taxonomy_id_list = insert_wp_term_taxonomy(tag_id_result)
insert_wp_term_relationships(question_id,term_taxonomy_id_list)
mylog.write_log("问题写入完毕!")
mylog.write_log("写入问题的ID为{}".format(question_id))
mylog.write_log("*"*70)
else:
question_id = 0
return (title)
begin_num = int(sys.argv[1])
end_num = int(sys.argv[2])
for i in range(begin_num,end_num):
mylog.write_log("正在处理id为{}的记录......".format(i))
stackoverflow_id = i
db_record = get_id_from_db(stackoverflow_id)
if db_record is None:
mylog.write_log("开始抓取id为{}的数据".format(stackoverflow_id))
db_results = get_data_from_db(stackoverflow_id)
if db_results is not None:
insert_question_results = insert_question(db_results)
else:
mylog.write_log("id为{}的数据不存在。".format(i))
else:
mylog.write_log('当前id为{}的文章之前已经发布过,略过......'.format(stackoverflow_id))
mylog.write_log('-'*70)
connection.close()
cursor_server.close()
cursor.close()