谢乾坤 | Kingname

给时光以生命。

最近实习工作任务比较重,又在做数据挖掘的教学工作,同时还在做毕设,所以博客更新比较慢。不过最近肯定会有大动作。

闲话毕,转正题。在制作新浪微博模拟登录的部分时,遇到了一些问题。

我使用requests获取了新浪微博的源代码,通过lxml库的etree.HTML来处理一段网页源代码,从而生成一个可以被xpath解析的对象。

1
selector = etree.HTML(html)

遇到报错:

1
2
3
4
selector = etree.HTML(html)
File "lxml.etree.pyx", line 2953, in lxml.etree.HTML (src\lxml\lxml.etree.c:66734)
File "parser.pxi", line 1780, in lxml.etree._parseMemoryDocument (src\lxml\lxml.etree.c:101591)
ValueError: Unicode strings with encoding declaration are not supported. Please use bytes input or XML fragments without declaration.

根据报错信息推测,可能是因为不支持编码声明的Unicode字符串。Google发现这个问题在2012年就已经有人提交给作者了,但是一直没有被修复。地址在->https://gist.github.com/karlcow/3258330

不过下面的人也给出了解决办法:

1
2
html = bytes(bytearray(html, encoding='utf-8'))
selector = etree.HTML(html)

首先将源代码转化成比特数组,然后再将比特数组转化成一个比特对象。这样就可以绕过这个bug。

然而,又有人认为这不是一个bug, 所以一直没有被修复。这是由于,我获取源代码是使用r.text

1
html = requests.get('xxxxxx',cookies=cookies).text

而如果使用r.content:

1
html = requests.get('xxxxxx',cookies=cookies).content

就不会报错。

那r.text与r.content有什么区别呢?分析requests的源代码发现,r.text返回的是Unicode型的数据,而使用r.content返回的是bytes型的数据。也就是说,在使用r.content的时候,他已经只带了

1
html = bytes(bytearray(html, encoding='utf-8'))

这样一个转化了。

最近CentOS都声明放弃Python2了,编码问题确实浪费了很多时间,等空下来转Python3吧~

Python的单元测试(一)中,我们讲了单元测试的概念以及一个简单的单元测试例子。

在这个例子中,只有三个函数,于是可以把每个函数的输出结果打印到屏幕上,再用肉眼去看结果是否符合预期。然而假设有一个程序,有二十个类,每个类又有几十个函数,有些函数的输出结果还多达几十行,在这种情况下,肉眼如何看得出?

当然你可以使用if判断

1
2
3
4
if 输出结果 == 预期结果:
return True
else:
print u'不相等'

这个时候,你发现,程序有几个函数,后三行就要重复几次,本着代码简洁的原则,你把这个判断的过程写到一个函数中:

1
2
3
4
5
def isequal(output,right_output):
if output == right_output:
return True
else:
print u'不相等'

那么恭喜你,你步入正规了,然而,这一切已经有人为你做好了。欢迎unittest模块出场。

unittest supports test automation, sharing of setup and shutdown code for tests, aggregation of tests into collections, and independence of the tests from the reporting framework. The unittest module provides classes that make it easy to support these qualities for a set of tests.

Python的官方文档这样写到,unittest支持自动化测试,测试的安装分享和关闭代码……

一句话说来,就是,unittest很好用。

还是用上一次的readandadd.py来演示unittest的基本用法,首先介绍unittest的一个函数,assertEqual(first,second),这个函数的作用是检查变量first的值与second的值是否相等,如果不相等就抛出错误。

先创建utest.py文件,输入以下代码并运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#-*-coding:utf-8-*-
import unittest
import readandadd

class basictest(unittest.TestCase): #类名可以随便取
def testread(self): #每个函数都要以test开头
output = readandadd.read('1.txt')
self.assertEqual(output,'2,3')

def testgetnum(self):
output = readandadd.getnum('2,3')
self.assertEqual(output,['2', '3'])

def testaddnum(self):
output = readandadd.addnum([2,3])
self.assertEqual(output,5)

if __name__ == '__main__':
unittest.main()

运行结果如下:

1
2
3
4
5
...
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK

你也许会说,就一个ok,什么都没有啊。那我先把testread()函数下面的

1
self.assertEqual(output,'2,3')

改为

1
self.assertEqual(output,'2,4')

在运行utest.py看看输出结果如何:

1
2
3
4
5
6
7
8
9
10
11
12
13
..F
======================================================================
FAIL: testread (__main__.basictest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "E:/mystuff/unitest/utest.py", line 8, in testread
self.assertEqual(output,'2,4')
AssertionError: '2,3' != '2,4'

----------------------------------------------------------------------
Ran 3 tests in 0.000s

FAILED (failures=1)

这里准确的找出了错误的位置和错误的具体内容。注意看最上面,有个

1
..F

猜测它可能是标示错误的位置。保持testread的错误不改,再把testgetnum()函数中的以下内容

1
self.assertEqual(output,['2', '3'])

改为

1
self.assertEqual(output,['2', '6'])

再运行utest.py程序,输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
.FF
======================================================================
FAIL: testgetnum (__main__.basictest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "E:/mystuff/unitest/utest.py", line 12, in testgetnum
self.assertEqual(output,['2', '6'])
AssertionError: Lists differ: ['2', '3'] != ['2', '6']

First differing element 1:
3
6

- ['2', '3']
? ^

+ ['2', '6']
? ^


======================================================================
FAIL: testread (__main__.basictest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "E:/mystuff/unitest/utest.py", line 8, in testread
self.assertEqual(output,'2,4')
AssertionError: '2,3' != '2,4'

----------------------------------------------------------------------
Ran 3 tests in 0.001s

FAILED (failures=2)

可以看出,这里分别把两个错误显示了出来。并且第一行变成了

1
.FF

所以,第一行的内容应该从右往左读,它标明错误函数在所有函数的相对位置。

现在再把testread()和testgetnum()改回去,再看看全部正确的输出:

1
2
3
4
5
...
----------------------------------------------------------------------
Ran 3 tests in 0.000s

OK

印证了那句话,没有消息就是最好的消息。

这篇文章介绍了单元测试模块unittest的assertEqual的基本用法,下一篇文章将要更加全面的介绍unittest模块。


本文首发地址:http://kingname.info/2015/03/04/pythonunittest2/

作者:青南 

转载请注明出处。

测试驱动的软件开发方式可以强迫程序员在开发程序的时候使程序的函数之间实现高内聚,低耦合。这样的方式可以降低函数之间的依赖性,方便后续的修改,增加功能和维护。

一个函数高内聚,就是指这个函数专注于实现单一的任务,不会做除了生产这个任务以外的其他事情。可以想象一个人,他把自己关在一个小房子里面生产东西,只留两扇窗户,他需要什么材料,你就从小窗户给他送进去(参数),他做好了东西,就给你从另一个窗户里面送出来(return),他不会说,我要生产一个轮子,但是我首先需要一个女人进来,他不会说,这是计划的一部分。

几个函数是低耦合的,就是指他们的依赖性小。他们就像是葫芦娃,每个都有自己独特的能力,可以自己单干,在关键的时候还可以合体,变成小金刚。他们就像积木一样,各有各的功能,需要使用的时候直接组合在一起就可以了。

使用测试驱动开发,每一个测试只测试一个功能,这样就可以迫使函数把自己独立出来,尽量减少和其他函数的依赖。

例如,有一个文件1.txt,他的内容是两个数字,使用逗号隔开。形如“2,4”(不包括外侧双引号,下同)。我要写一个程序readandadd.py,读取硬盘上的1.txt文件,然后把这个文件的内容打印到屏幕上。

不规范的写法一:

1
2
3
4
f= open('1.txt','r')
b = f.read().split(',')
f.close()
print int(b[0])+int(b[1])

不规范写法二:

1
2
3
4
5
6
def A():
f= open('1.txt','r')
b = f.read().split(',')
f.close()
print int(b[0])+int(b[1])
A()

比较规范的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def read(filename):
f= open(filename,'r')
info = f.read()
f.close()
return info

def getnum(info):
twonum = info.split(',')
return twonum

def addnum(twonum):
return int(twonum[0])+int(twonum[1])

if __name__ == '__main__':
info = read('1.txt')
twonum = getnum(info)
result = addnum(twonum)
print result

这样写的好处是,如果想测试读文件的功能,就只需要测试read()函数,如果想测试把两个数分开的功能,就只需要测试getnum()函数。而相反,在不规范写法二中,虽然只想测试两个数字相加的功能,可是却不得不首先打开文件并读取文件然后把数字分开。

继续回到比较规范的写法当中,我相信很多人写完read()函数以后,肯定会输入如下代码:

1
2
3
4
5
6
7
def read(filename):
f= open(filename,'r')
info = f.read()
f.close()
return info

print read('1.txt')

然后运行程序,发现正常打印出’2,3’以后,再开始写getnum()函数。写完getnum以后,测试getnum()函数没问题以后再开始写然后测试addnum()函数。最后测试整个程序的功能。

其实这个过程,已经就是在做单元测试了。然而这样操作的弊端是什么?如果整体程序已经写好了,之前做测试点代码也就删除了。那么如果突然把程序做了修改。例如1.txt里面数字的分隔从1个逗号变成了空格,或者变成了3个数字,那必然要修改getnum(),但是又如何测试修改的部分呢?还要把不相干的代码给注释掉。不仅麻烦,而且容易出错。

现在,把测试的代码单独独立出来。会有什么效果呢?尝试创建一个test.py程序,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import readandadd

def testread():
print 'read:',readandadd.read('1.txt')

def testgetnum():
print 'getnum:',readandadd.getnum('2,3')

def testaddnum():
print 'addnum:',readandadd.addnum([2,3])

if __name__ == '__main__':
testread()
testgetnum()
testaddnum()

运行test.py以后输出结果如下:

1
2
3
read: 2,3
getnum: ['2', '3']
addnum: 5

每一个函数的输出结果一目了然,而且在修改了readandadd.py的函数以后,重新运行test.py就可以知道输出结果有没有符合预期。

当然,这里这个例子非常的简单,因此可以人工通过观察test.py的输出结果来确定是否符合预期,那对于大量的函数的测试,难道也要让肉眼来看吗?当然不是。于是,下一篇文章将会介绍Python的单元测试unittest。

最近在学习Flask,本着学以致用的原则,开发了“未知道”这个网站。未知道的宗旨是,不注册,不审核,不删帖,无私聊,不记录任何信息。是一个派对式的或者说广场式的真·匿名聊天环境。

网站地址请戳->http://unknown.kingname.info/

网站上线首日访问量突破2000,上线第二日访问量突破10000。

现在到处都在要求实名制,请珍惜这少有的一片匿名的净土。

这个网站目前仍然在每天增加新的功能,我会开一个系列文章,用以记录在这个网站的开发过程中遇到的问题以及解决方案。请大家保持关注。

在写爬虫的时候,经常会使用xpath进行数据的提取,对于如下的代码:

1
<div id="test1">大家好!</div>

使用xpath提取是非常方便的。假设网页的源代码在selector中:

1
data = selector.xpath('//div[@id="test1"]/text()').extract()[0]

就可以把“大家好!”提取到data变量中去。

然而如果遇到下面这段代码呢?

1
<div id="test2">美女,<font color=red>你的微信是多少?</font><div>

如果使用:

1
data = selector.xpath('//div[@id="test2"]/text()').extract()[0]

只能提取到“美女,”;

如果使用:

1
data = selector.xpath('//div[@id="test2"]/font/text()').extract()[0]

又只能提取到“你的微信是多少?”

可是我本意是想把“美女,你的微信是多少?”这一整个句子提取出来。

这还不是最糟糕的,还有第三段代码:

1
<div id="test3">我左青龙,<span id="tiger">右白虎,<ul>上朱雀,<li>下玄武。</li></ul>老牛在当中,</span>龙头在胸口。<div>

而且内部的标签还不固定,如果我有一百段这样类似的html代码,又如何使用xpath表达式,以最快最方便的方式提取出来?

我差一点就去用正则表达式替换了。还好我去Stack Overflow上面提了问。于是很快就有人给我解答了。

使用xpath的string(.)

以第三段代码为例:

1
2
data = selector.xpath('//div[@id="test3"]')
info = data.xpath('string(.)').extract()[0]

这样,就可以把“我左青龙,右白虎,上朱雀,下玄武。老牛在当中,龙头在胸口”整个句子提取出来,赋值给info变量。

文章首发地址:http://kingname.info

今天在Github更新代码的时候,不小心把Gmail私钥文件更新上去了。即便我立刻删除了这个文件,可是在版本历史里面仍然可以看到这个文件的内容。这可把我吓坏了。

Google一圈以后,终于找到了解决办法。把某个文件的历史版本全部清空。

首先cd 进入项目文件夹下,然后执行以下代码:

1
2
3
4
5
6
7
8
9
10
11
git filter-branch --force --index-filter 'git rm --cached --ignore-unmatch 文件名' --prune-empty --tag-name-filter cat -- --all

git push origin master --force

rm -rf .git/refs/original/

git reflog expire --expire=now --all

git gc --prune=now

git gc --aggressive --prune=now

虽然不知道他们的作用是什么,不过真的解决了我的问题。看起来,以前我说我熟练掌握git,真是自不量力。

文章首发地址:http://kingname.info

这篇文章不会教你在技术角度上使用log,而是告诉你为什么要使用log日志功能。

为什么要使用Log

使用微信控制你的电脑这篇文章中,我写好了电脑端的程序,使用py2exe生成可执行文件,并把它们发送给我的朋友让他们进行测试。但是他们把_config.ini设置好以后,运行程序就看到一个黑色窗口一闪而过。或者有些人一开始看到程序能正常登陆邮箱,但是准备执行命令的时候,窗口自动关闭。

由于没有日志记录程序的运行状态,我根据他们的描述很难定位到错误在哪个地方。于是折腾了一个下午。

这个时候,我觉得添加一个日志的功能迫在眉睫。

哪些地方应该用Log

目前网上能找到的关于如何使用日志的文章,全部都是从技术角度讲怎么使用log:在XX地方应该先imort logging,然后basicconfig设定XX内容。可是我现在的问题是:

  • 应该在程序的哪些地方添加日志的输出?
  • 输出什么内容?
  • 如何输出才能以方便我的监控程序的运行情况?

于是我只有自己摸索。因此,以下内容是我自己摸索出来的野路子,可能会有错漏。希望有经验的朋友能给我指正,非常感谢。

这些地方应该用Log

使用使用微信控制你的电脑文章中涉及到的例子

程序入口代码如下:

1
2
3
4
5
6
7
if __name__=='__main__':
init()
print u'等待接收命令'
logging.info(u'初始化完成。')
while 1:
time.sleep(int(time_limit)) #每5分钟检查一次邮箱
accp_mail()

以上代码表示程序运行以后,首先执行init()函数,于是如果init()初始化没有什么问题,成功执行完成以后,就应该在日志中输出“初始化完成”,然后进入接收邮件的循环。如果程序窗口运行以后一闪而过,而且生成的日志中没有初始化完成这样的字眼,那就说明问题出在初始化上面。

然而初始化函数里面代码也有很多,又如何知道是初始化程序里面的什么地方出问题了呢?

所以,再初始化函数里面,也应该有一定的日志记录。

再看初始化函数的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def init():
global username,password,host,boss_email,time_limit

try:
f = open('_config.ini','r')
except IOError,e:
logging.error(e)
exit()

info = f.readlines()
try:
host = re.search('host:(.*?)\n',info[0],re.S).group(1)
username = re.search('username:(.*?com)',info[1],re.S).group(1)
password = re.search('password:(.*?)\n',info[2],re.S).group(1)
boss_email = re.search('boss_email:(.*?com)',info[3],re.S).group(1)
time_limit = re.search('time_limit:(.*?)\n',info[4],re.S).group(1)
except Exception,e:
logging.error(e)

logging.info(u'打开配置文件成功。。。')


#将命令生成字典,便于查询
command_start = info.index('<command>\n')
command_end = info.index('</command>\n')
for each in info[command_start+1:command_end]:
command = each.split('=')
command_dict[command[0]] = command[1]

logging.info(command_dict)

open_start = info.index('<open_file>\n')
open_end = info.index('</open_file>\n')
for each in info[open_start+1:open_end]:
open_file = each.split('=')
open_dict[open_file[0]] = open_file[1][:-1]

logging.info(open_dict)

f.close()

在这段代码中,我使用try except命令捕获异常,如果发生异常,就使用logging.error将错误写入日志中。例如当_config.ini被改名了或者被删除的时候,程序就会报错,而通过日志就能发现这个错误。

经过上面的代码,如果在初始化的过程中出错,就可以很快确定问题出在什么地方。

初始化完成以后,进入邮件接收阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def accp_mail():
logging.info(u'开始检查邮箱')
try:
pp = poplib.POP3_SSL(host)
pp.set_debuglevel(1)
pp.user(username)
pp.pass_(password)
ret = pp.list()
logging.info(u'登录邮箱成功。')
except Exception,e:
logging.error(e)
exit()

logging.info(u'开始抓取邮件。')
try:
down = pp.retr(len(ret[1]))
logging.info(u抓取邮件成功。'')
except Exception,e:
logging.error(e)
exit()

logging.info(u'开始抓取subject和发件人')
try:
subject = re.search("Subject: (.*?)',",str(down[1]).decode('utf-8'),re.S).group(1)
sender = re.search("'X-Sender: (.*?)',",str(down[1]).decode('utf-8'),re.S).group(1)
logging.info(u'抓取subject和发件人成功')
except Exception,e:
logging.error(e)
exit()

if subject != 'pass':
if sender == boss_email:
DealCommand(subject)
pp.quit()

以上这段代码,对邮箱的登录与邮件的读取均作了监控,一旦有某个环节出了问题,就会体现在日志中。

通过在登录环节的try except返回的错误日志,发现有很多朋友无法登录邮箱,而密码用户名都没有错,从而推断是没有在新浪邮箱的账户控制里面打开客服端接收POP3和SMTP的功能。

再来看DealCommand()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def DealCommand(subject):
logging.info(u'开始处理命令。')
send_mail('pass','slave')
if subject in command_dict:
logging.info(u'执行命令')
try:
command = command_dict[subject]
os.system(command)
send_mail('Success','boss')
logging.info(u'执行命令成功')
except Exception,e:
logging.error(e)
send_mail('error','boss',e)
elif subject in open_dict:
logging.info(u'打开文件')
try:
open_file = open_dict[subject]
win32api.ShellExecute(0, 'open', open_file, '','',1)
send_mail('Success','boss')
logging.info(u'打开文件成功')
except Exception,e:
logging.error(e)
send_mail('error','boss',e)
else:
send_mail('error','boss','no such command')

执行命令的地方可能会出错,于是果断使用try except捕获错误。并使用日志记录。

最后是send_mail()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def send_mail(subject,flag,body='Success'):

msg = MIMEText(body,'plain','utf-8')#中文需参数‘utf-8’,单字节字符不需要
msg['Subject'] = subject
msg['from'] = username
logging.info('开始配置发件箱。')
try:
handle = smtplib.SMTP('smtp.sina.com', 25)
handle.login(username,password)
logging.info('发件箱配置成功')
except Exception,e:
logging.error(e)
exit()

logging.info(u'开始发送邮件'+ 'to' + flag)
if flag == 'slave':
try:
handle.sendmail(username,username, msg.as_string())
logging.info(u'发送邮件成功')
except Exception,e:
logging.error(e)
exit()
elif flag == 'boss':
try:
handle.sendmail(username,boss_email, msg.as_string())
logging.info(u'发送邮件成功')
except Exception,e:
logging.error(e)
exit()

handle.close()
logging.info(u'发送邮件结束'+flag)

这里对邮件发件的部分需要特别仔细的错误捕获,然后记录进入日志中。

完整的代码见:https://github.com/kingname/MCC.git中的auto.py

总结

需要使用日志记录的地方大致有一下几处:

  • 所有输入输出处,无论是从文件输入还是从网络等其他地方输入
  • 执行命令处
  • 调用函数处

PS

这里我对一般信息的记录使用了info,实际上,一般用作调试的话,是使用debug更多。

需要用户输入的地方,总会有想不到的错误,多小心都不为过。例如,用户可能会把time_limit设定为一个全角数字。而本文中就没有捕获这种问题到日志中。所以如果不放心的话,还可以更进一步的细化日志。

在上一篇文章使用AWS亚马逊云搭建Gmail转发服务(二)中,我们已经介绍了如何把邮件转发程序部署在服务器上。但是这样还不够。还需要实时监控程序的运行状态。于是,给程序增加日志记录功能是非常重要的。

日志

这里使用Python的logging库,实现日志记录功能。

1
2
3
4
5
6
7
8
import logging

logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s %(levelname)s %(message)s',
datefmt='%Y %m %d %H:%M:%S', #日期格式:年月日时分秒
filename='mail_note.log', #文件名
filemode='a') #以最佳的方式添加日志
mail_log = logging.getLogger('maillog')

以上代码的作用是导入logging库功能。然后配置logging的输出格式。
各行代码的作用已经注释。

要使用日志的时候,通过以下代码:

1
logging.info('日志内容')

同类的还有:

1
2
3
4
mail_log.debug('内容')
mail_log.warning('内容')
mail_log.error('内容')
……

logging库的功能还有很多,这里只是简单的介绍一下,更多的功能可以查阅相关的资料。

Flask

现在日志已经生成。又如何通过Flask查看呢?由于我的前端不行。因此这里就不使用精细的模板了。Flask的部署就不叙述了,各位可以参考Flask官方文档http://dormousehole.readthedocs.org/en/latest/

这里我只演示一个非常简单的日志输出功能。编写gmail_flask.py,请看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#-*-coding:utf-8-*-
from flask import Flask
app = Flask(__name__)

@app.route('/')
def index():
f = open('mail_note.log','rb') #以读文件的方式打开mail_note.log文件
content = f.readlines()#按行读取日志
s = ''
for each in content:
s += each
s += '</p>'#输出日志
f.close()
return s

if __name__ == '__main__':
app.run(host='0.0.0.0') #开发外网访问

这个功能是把日志按行输出到网页上。
现在测试一下功能:
在终端窗口输入:

1
2
screen
python gmail_helper.py

然后Ctrl+A+D返回,再输入:

1
2
screen
python gmail_flask.py

然后访问服务器的5000端口查看效果。如图是我的服务器返回信息:

这里出现了Google的很多信息,这是由于Gmail的API库文件discovery.py里面也有用到日志功能。这个时候这里调用根logging,就会把discovery.py里面logging.info输出的信息写出来。这个时候怎么办呢?我对logging不是很熟悉,还请熟悉logging模块的朋友指点迷津。

我使用了一个变通的办法:

修改gmail_flask.py文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#-*-coding:utf-8-*-
from flask import Flask
app = Flask(__name__)

@app.route('/')
def index():
f = open('mail_note.log','rb') #以读文件的方式打开mail_note.log文件
content = f.readlines()#按行读取日志
s = ''
for each in content:
if 'gmail_helper.py' in each: #判定信息来自gmail_helper.py而不是discovery.py
s += each
s += '</p>'#输出日志
f.close()
return s

if __name__ == '__main__':
app.run(host='0.0.0.0') #开发外网访问

效果如下图:

源代码已更新到Github,请戳->https://github.com/kingname/MCC/blob/master/ghelper_with_log

我的日志会通过博客进行开放,地址请戳:

http://flask.kingname.info:5000

使用AWS亚马逊云搭建Gmail转发服务(二)中,我们最后运行了邮件转发程序。本以为程序就可以正常工作了,于是我关闭了Putty窗口。几个小时后回来,发现程序早就终止运行了。

原来,在一般情况下,当一个session结束时,这个session里面运行的进程也会同时结束。这可不能达到我们要的效果。于是screen命令登场了。

使用screen命令,可以让程序在断开session的时候继续运行。要打开screen,只需要在终端输入screen这个命令即可。请看下面演示:

1
2
3
cd wwwproject/ghelper
screen
python gmail_helper.py

这样就在一个screen里面运行了邮件转发程序。那么如何退出呢?

键盘上Ctrl+A+D三个键一起按。这样就返回到了进入screen之前的终端界面。而邮件转发程序仍然在后台默默的运行。现在可以关闭putty,然后放心的去睡觉了。

那重新SSH登录服务器以后,想关闭这个邮件转发程序怎么办?

两个方法:

方法一,直接结束Python进程。

方法二,在终端窗口输入:

1
screen -ls

终端窗口返回:

1
2
3
4
5

ubuntu@ip-172-31-15-35:~$ screen -ls
There is a screen on:
7956.pts-0.ip-172-31-15-35 (01/01/2015 12:16:10 PM) (Detached)
1 Socket in /var/run/screen/S-ubuntu.

注意这里的7956就是pid,于是输入:

1
screen -r 7956

就能回到Python的运行窗口了。于是,Ctrl+C结束程序运行。

有了screen命令,再也不怕关闭session后程序结束运行了。

在上一篇文章使用AWS亚马逊云搭建Gmail转发服务(一)中,我们介绍了如何在亚马逊AWS的免费主机EC2中使用Gmai API从而接收邮件的操作。在这篇文章中,将要讲解如何制作一个邮件转发服务。

我之前有写一篇文章,使用微信控制你的电脑其中有讲解如何使用Python的smtplib库实现发送邮件。于是Gmail邮件转发的思路就出来了:

程序定期检查Gmail邮箱,如果发现有新的邮件,就将新邮件的标题,发送人,还有邮件正文提取出来,并使用 MIMEText构造一个邮件的object 然后再用国内邮箱发送给自己的主邮箱。

这里涉及到三个邮箱:Gmail,国内邮箱一(发送),国内邮箱二(接收)。其中国内邮箱二是我的常用邮箱,我将它和微信绑定,因此一旦有新邮件,就会收到提醒。国内邮箱一是一个备胎邮箱,他的作用就是一个转发而已。

可能有人会问为什么不用Gmail直接转发?因为我觉得很有可能不久以后,Gmail发送的邮件,国内邮箱收不到。

那么我们将使用微信控制你的电脑这篇文章中涉及到的auto.py进行修改,编写一个ghelper_sender.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#-*-coding:utf-8 -*-

import smtplib
import sys
from email.mime.text import MIMEText

reload(sys)
sys.setdefaultencoding('utf-8')

username = "[email protected]"# 接收邮箱
password = "123abc"# 接收邮箱密码
mailbox = "[email protected]" #国内邮箱二

def send_mail(subject,body='Success'):
msg = MIMEText(body,'plain','utf-8')#中文需参数‘utf-8’,单字节字符不需要
msg['Subject'] = subject
msg['from'] = username
handle = smtplib.SMTP('smtp.sina.com', 25)
handle.login(username,password)
handle.sendmail(username,mailbox, msg.as_string())
handle.close()

这里需要注意,msg[‘from’] 的值必须是国内邮箱一,而不是发邮件到Gmail邮箱的那个地址,否则会报错。

现在打开上一篇文章中的ghelper_api.py,在最开头添加:

1
from ghelper_sender import send_mail

导入send_mail函数。

由于ghelper_api.py 中,上一篇文章中通过

1
2
a = ListMessagesWithLabels(gmail_service,'me')[0]['id']
b = GetMessage(gmail_service,'me',a)

获得你邮件的正文和发件人,现在再获取邮件的标题:

1
subject = b['payload']['headers'][12]['value']

这里我把发件人和邮件正文添加到一起,于是得到以下代码:

1
2
3
4
5
6
7
8
9
10
a = ListMessagesWithLabels(gmail_service,'me')[0]['id']
b = GetMessage(gmail_service,'me',a)
content = b['snippet']
sender = b['payload']['headers'][3]['value']
subject = b['payload']['headers'][12]['value']
body = sender + '\n' + content
send_mail(subject,body)
print subject
print body
print '============================'

这样就能发送一次邮件了。接收到的效果如图:


但是这显然不是我们需要的效果。我们希望这个程序能够自动运行,自动查看邮箱。于是,编写一个函数isnew()来检测是否有新的邮件发送过来,如果有就获取新邮件并发送。

设定一个全局变量last_email用于保存已读的最后一封邮件,令他的初始化为当前邮箱里面最新的一封邮件的id.接下来每过一段时间,程序就检查邮箱里面邮件的id,如果和last_email相同,就什么都不做;如果不相同,则程序就读取邮件,直到读取到某一封邮件的id和last_email相同为止。再令last_email的值为当前最新的id.代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def init():
global last_email
last_email = ListMessagesWithLabels(gmail_service,'me')[0]['id']

def isnew():
global last_email
themail = ListMessagesWithLabels(gmail_service,'me')
for each in themail:
if each['id'] != last_email:
tosend(each['id'])
else:
break
last_email = themail[0]['id']

def tosend(id):
b = GetMessage(gmail_service,'me',id)
content = b['snippet']
sender = b['payload']['headers'][3]['value']
subject = b['payload']['headers'][12]['value']
body = sender + '\n' + content
send_mail(subject,body)
print subject
print body
print '============================'

设定一个主函数,并把上面的代码放在主函数里面,并循环执行:

1
2
3
4
5
if __name__=='__main__':
init()
while 1:
time.sleep(3600) #每一小时检查一次邮箱
isnew()

完整的代码我放在了Github上面,请戳->https://github.com/kingname/MCC/tree/master/ghelper

在下一篇文章中,将介绍Python的logging库,并将每次的发送记录通过日志记录下来。同时用Flask搭建一个网站,从而方便直观的检查这个程序的运行情况。

0%