Python–fluent
#.1.python数据模型
from collections import namedtuple
from random import choice
# 工厂函数,具名元组namedtuple,除了使用index,可以使用每个字段名称来访问
Card = namedtuple('Card', ['rank', 'suit'])
class FrenchDesk:
ranks = [str(n) for n in range(2, 11)] + list('JQKA')
suits = 'spades diamonds clubs hearts'.split()
def __init__(self):
self._cards = [Card(rank, suit) for suit in self.suits
for rank in self.ranks]
def __len__(self):
return len(self._cards)
# __getitem__索引获取元素方法,使得FrenchDesk()对象变成可迭代的
def __getitem__(self, pos):
return self._cards[pos]
test_card = FrenchDesk()
# print(test_card[0])
# print(choice(test_card))
# print(test_card[12::13])
suit_values = dict(spades=3, hearts=2, clubs=1, diamonds=0)
def spaders_high(card):
rank_value = FrenchDesk.ranks.index(card.rank)
return rank_value * len(suit_values) + suit_values[card.suit]
for card in sorted(test_card, key=spaders_high):
print(card)
from math import hypot
class Vector:
def __init__(self, x=0, y=0):
self.x = x
self.y = y
# 将对象用字符串表示
# 相比较于__str__的区别在于__str__是面向用户的,而__repr__面向开发者
# __repr__可以直接展示,而__str__需要调用str()或者print才会展示
def __repr__(self):
return 'Vector(%r, %r)' % (self.x, self.y)
#return 'Vector({0}, {1})'.format(self.x, self.y)
def __abs__(self):
return hypot(self.x, self.y)
# bool(x)会调用__bool__方法,若不存在__bool__,会调用__len__方法
def __bool__(self):
return bool(abs(self))
def __add__(self, other):
x = self.x + other.x
y = self.y + other.y
return Vector(x, y)
def __mul__(self, scalar):
return Vector(self.x*scalar, self.y*scalar)
a = Vector(1, 2)
b = Vector(2, 3)
print(a + b)
print(bool(a))
#.2.序列构成的数组
import random
from collections import namedtuple
import bisect
import sys
from array import array
# 容器序列:可以存放不同数据类型
# list、tuple、deque
# 扁平序列:只能容纳一种序列类型
# str、bytes、bytearray、memoryview、array.array
# 列表推导listcomps
codes = [ord(symbol) for symbol in '$¢£¥€¤']
print(codes)
# filter map (function, iterable)
#beyond_ascii = [ord(s) for s in '$¢£¥€¤' if ord(s) > 127]
beyond_ascii = list(filter(lambda c: c > 127, map(ord, '$¢£¥€¤')))
print(beyond_ascii)
# 生成式表达器genexps
# 相比于列表推导,不是建好完整列表,而是逐个产出元素
symbol_ord = (ord(symbol) for symbol in '$¢£¥€¤')
print(symbol_ord)
# 元组与记录
# 元组拆包
lax_coordinates = (33.9425, -118.408056)
latitude, longitude = lax_coordinates
t = (20, 8)
print(divmod(*t))
# 用*来处理剩下的元素
a, b, *res, d = range(5)
print(res)
# 具名元组namedtuple
City = namedtuple('City', 'name country population coordinates')
tokyo = City('Tokyo', 'JP', 36.933, (35.689722, 139.691667))
print(City._fields) # 输出属性
LatLong = namedtuple('LatLong', 'lat long')
delhi_data = ('Delhi NCR', 'IN', 21.935, LatLong(28.613889, 77.208889))
delhi = City._make(delhi_data) # 接受可迭代对象生成实例
print(delhi._asdict())
# 切片
invoice = '''
... 1909 Pimoroni PiBrella $17.50 3 $52.50
... 1489 6mm Tactile Switch x20 $4.95 2 $9.90
... 1510 Panavise Jr. - PV-201 $28.00 1 $28.00
... 1601 PiTFT Mini Kit 320x240 $34.95 1 $34.95
'''
UNIT_PRICE = slice(40, 52)
DESCRIPTION = slice(6, 40)
line_items = invoice.split('\n')[2:]
for item in line_items:
print(item[UNIT_PRICE], item[DESCRIPTION])
# 多维切片和省略
# a[m:n, k:l] x[i, ...] 就是 x[i, :, :, :]
# 建立由列表组成的列表
board = [['_'] * 3 for i in range(3)]
# 序列的增量赋值
l = [1, 2, 3]
print(id(l))
l *= 2
print(id(l))
t = (1, 2, 3)
print(id(t))
t *= 2
print(id(t)) # 对于不可变序列,拼接操作创建了新对象
print()
# sort和sorted
# 返回None表示就地操作
# Timsort 稳定排序
# 它是一种自适应算法,会根据原始数据的顺序特点交替使用插入排序和归并排序,以达到最佳效率
# bisect管理已排序的序列
import bisect
import sys
HAYSTACK = [1, 4, 5, 6, 8, 12, 15, 20, 21, 23, 23, 26, 29, 30]
NEEDLES = [0, 1, 2, 5, 8, 10, 22, 23, 29, 30, 31]
ROW_FMT = '{0:2d} @ {1:2d} {2}{0:<2d}'
def demo(bisect_fn):
for needle in reversed(NEEDLES):
position = bisect_fn(HAYSTACK, needle)
offset = position * ' |'
print(ROW_FMT.format(needle, position, offset))
if sys.argv[-1] == 'left':
bisect_fn = bisect.bisect_left
else:
# bisect函数其实是bisect_right函数的别名,默认放置在右边
bisect_fn = bisect.bisect
print('DEMO:', bisect_fn.__name__)
print('haystack ->', ' '.join('%2d' % n for n in HAYSTACK))
demo(bisect_fn)
# bisect.insort插入新元素
# insort(seq, item)把变量item插入到序列seq中,并能保持seq的升序顺序
# 有两个可选参数lo和hi来缩小搜索范围,lo的默认值为0,hi的默认值为序列的长度
mylist = []
for i in range(5):
value = random.randrange(10)
bisect.insort(mylist, value)
print('%2d ->' % value, mylist)
# 数组array.array,当需要只包含数字的列表,数组比list更高效
# 使用tofile和fromfile速度很快,也可以使用pickle.dump
array_test = array('d', (random.random() for i in range(10)))
print(array_test)
# 内存视图memoryview
# 在不复制内容的情况下操作同一个数组的不同切片
numbers = array('h', [-2, -1, 0, 1, 2]) # 'h'有符号整型
memv = memoryview(numbers)
memv_oct = memv.cast('B')
print(memv_oct.tolist())
memv_oct[5] = 4
print(numbers)
# numpy和scipy
# NumPy和SciPy提供的高阶数组和矩阵操作
# Pandas和Blaze数据分析库就以它们为基础提供了高效的且能存储非数值类数据的数组类型,和读写常见数据文件格式(例如 csv、xls、SQL 转储和 HDF5)的功能。
# 双向队列和其他形式队列
# deque、heapq、queue、multiprocessing、asyncio
#.3.字典与集合
from collections import *
from types import MappingProxyType
from dis import dis
# 字典的不同构造方法
a = dict(one=1, two=2, three=3)
b = {'one': 1, 'two': 2, 'three': 3}
c = dict(zip(['one', 'two', 'three'], [1, 2, 3]))
d = dict([('two', 2), ('one', 1), ('three', 3)])
e = dict({'three': 3, 'one': 1, 'two': 2})
print(a == b == c == d == e)
f = {i: i+10 for i in range(5)}
f['one'] = 100
print(f)
# update方法处理参数m的方式,是典型的“鸭子类型”
# get方法防止找不到键值
f.update(a)
print(f)
# setdefault处理找不到的键
# setdefault(key[, default])
# 对于字典中已经有这个key,直接return这个key对应的值
# 没有key,可以指定默认值,否则None返回
f.setdefault(10, 0)
print(f)
# defaultdict处理找不到的键
g = defaultdict(list)
print(g[0])
# 特殊方法__missing__
# __missing__ 方法只会被 __getitem__ 调用
# OrderedDict()添加键的时候会保持顺序
h = OrderedDict()
h[1] = 2
h[2] = 3
h[3] = 4
h.popitem()
print(h)
# ChainMap可以容纳数个不同的映射对象
# Counter整数计数器
# 子类化UserDict
# 不可变映射类型
# MappingProxyType,动态的,i的变化都会反馈到i_proxy
i = {1: 'A'}
i_proxy = MappingProxyType(i)
print(i_proxy)
i[2] = 'B'
print(i_proxy)
# 集合论
j = {1, 2, 3}
k = {3, 4, 5}
print(j|k, j-k, j&k, j^k) # 并集、差集、交集、对称差集
# set、frozenset(集合元素不可变,可散列)
# dis('{1}') # 查看字节码,cpu内部运行
# dis('set([1])')
# dict和set的背后(字典中的散列表)
# 散列表其实是一个稀疏数组(空间换时间),散列表里的单元通常叫作表元(bucket)。在 dict 的散列表当中,每个键值对都
# 占用一个表元,每个表元都有两个部分,一个是对键的引用,另一个是对值的引用。因为
# 所有表元的大小一致,所以可以通过偏移量来读取某个表元。
# 因为 Python 会设法保证大概还有三分之一的表元是空的,所以在快要达到这个阈值的时
# 候,原有的散列表会被复制到一个更大的空间里面。
#.4.文本和字节序列
from array import array
from unicodedata import normalize
# 编码与解码
s = 'café' # 4个Unicode字符
b = s.encode('utf8') # utf8编码,把str对象编码成bytes对象
print(len(b), b, type(b))
c = b.decode('utf8') # utf8解码
print(len(c), c)
# bytes bytearray
cafe = bytes('café', encoding='utf_8')
print(cafe)
print(cafe[0], ord('c'), cafe[:1])
cafe_arr = bytearray(cafe)
print(cafe_arr)
print(cafe_arr[-1:])
# 虽然二进制序列其实是整数序列,但是它们的字面量表示法表明其中有 ASCII 文本
# • 可打印的 ASCII 范围内的字节(从空格到 ~ ),使用 ASCII 字符本身。
# • 制表符、换行符、回车符和 \ 对应的字节,使用转义序列 \t 、 \n 、 \r 和 \\ 。
# • 其他字节的值,使用十六进制转义序列(例如, \x00 是空字节)
numbers = array('h', [-2, -1, 0, 1, 2])
numbyte = bytes(numbers)
print(numbyte)
# bytes -> str 解码输入的字节序列
# 100%str 只处理文本
# str -> byte 编码输出的文本
# 为了正确比较而规范化Unicode字符串
s1 = 'café'
s2 = 'cafe\u0301'
print(len(s1), len(s2))
# NFC(Normalization Form C)使用最少的码位构成等价的字符串
print(len(normalize('NFC', s1)), len(normalize('NFC', s2)))
# 而NFD把组合字符分解成基字符和单独的组合字符
print(len(normalize('NFD', s1)), len(normalize('NFD', s2)))
print(s1 == s2)
print(normalize('NFC', s1) == normalize('NFC', s2))
# casefold()不区分大小写
print(normalize('NFC', s1).casefold() == normalize('NFC', s2).casefold())
# 使用Unicode排序算法排序
# import pyuca
#.5.一等函数
from functools import reduce
from operator import add, mul
from functools import partial
import random
# 把函数视作对象
def fun_test(n):
'''返回n!'''
return n if n < 2 else n * fun_test(n-1)
print(fun_test.__doc__)
print(type(fun_test), fun_test)
fun2 = fun_test
print(fun2)
print(list(map(fun2, range(11))))
# 高阶函数
fruits = ['strawberry', 'fig', 'apple', 'cherry', 'raspberry', 'banana']
print(sorted(fruits, key=len))
# map、filter、reduce的现代替代品
print(list(map(fun_test, range(6))))
print([fun_test(n) for n in range(6)])
print(list(map(fun_test, filter(lambda n: n % 2, range(6)))))
print([fun_test(n) for n in range(6) if n % 2])
print(reduce(add, range(100)))
print(sum(range(100)))
print(all([1, 1, 0])) # 如果iterable的每个元素都是真值,返回True;all([])返回True
print(any([1, 1, 0])) # 只要iterable中有元素是真值,就返回True;any([])返回 False
# 匿名函数 lambda
print(sorted(fruits, key=lambda word: word[::-1]))
# 可调用对象
print([callable(ob) for ob in (abs, str, 12)]) # 判断对象能否调用
# 用户定义的可调用类型
# 实现方法__call__
class A:
def __init__(self, item):
self._item = list(item)
random.shuffle(self._item)
def __call__(self, *args, **kwargs): # 实现()
return self.pop_value()
def pop_value(self):
try:
return self._item.pop()
except IndexError:
raise LookupError('item is empty, can\'t pop')
a = A(range(10))
print(a.pop_value())
# 内置__call__函数判定可调用对象
print(a(), callable(a))
# 函数内省 dir(factorial)
# 从定位参数到仅限关键字参数
def tag(name, *content, cls=None, **attrs):
"""生成一个或多个HTML标签"""
if cls is not None:
attrs['class'] = cls
if attrs:
attr_str = ''.join(' %s="%s"' % (attr, value) for attr, value in sorted(attrs.items()))
else:
attr_str = ''
if content:
return '\n'.join('<%s%s>%s</%s>' % (name, attr_str, c, name) for c in content)
else:
return '<%s%s />' % (name, attr_str)
print(tag('br'))
print(tag('p', 'hello'))
print(tag('p', 'hello', 'world'))
print(tag('p', 'hello', id=33))
print(tag('p', 'hello', 'world', cls='sidebar'))
print(tag(content='testing', name="img"))
my_tag = {'name': 'img', 'title': 'Sunset Boulevard', 'src': 'sunset.jpg', 'cls': 'framed'}
print(tag(**my_tag))
# 函数注解
def clip(text:str, max_len:'int > 0'=80) -> str:
"""在max_len前面或后面的第一个空格处截断文本
"""
return ''
# operator模块
def fact(n):
return reduce(lambda a, b: a*b, range(1, n+1))
def fact1(n):
return reduce(mul, range(1, n+1))
# itemgetter attrgetter methodcaller
# functools.partial 冻结参数
# 把原函数的某些参数固定。
# 使用这个函数可以把接受一个或多个参数的函数改编成需要回调的API,这样参数更少
triple = partial(mul, 3)
print(triple(7))
print(list(map(triple, range(1,10))))
# BObo Web应用的面向对象方式
#.6.使用一等函数实现设计模式
# 《设计模式:可复用面向对象软件的基础》
from collections import namedtuple
from abc import ABC, abstractmethod
Customer = namedtuple('Customer', 'name fidelity') # 名字 积分
class LineItem:
def __init__(self, product, quantity, price): # 商品 重量 价格
self.product = product
self.quantity = quantity
self.price = price
def total(self):
return self.price * self.quantity
class Order: # 上下文
def __init__(self, customer, cart, promotion=None): # 客户,购物车,策略
self.customer = customer # Customer具名元祖
self.cart = list(cart) # LineItem购物车列表
self.promotion = promotion # 策略函数
def total(self):
if not hasattr(self, '__total'):
self.__total = sum(item.total() for item in self.cart)
return self.__total
def due(self):
if self.promotion is None:
discount = 0
else:
discount = self.promotion.discount(self)
return self.total() - discount
def __repr__(self):
fmt = '<Order total: {:.2f} due: {:.2f}>'
return fmt.format(self.total(), self.due())
class Promotion(ABC): # 策略:抽象基类
@abstractmethod
def discount(self, order):
"""返回折扣金额(正值)"""
# 有1000或以上积分的顾客,每个订单享5%折扣
class FidelityPromo(Promotion): # 第一个具体策略
"""为积分为1000或以上的顾客提供5%折扣"""
def discount(self, order):
return order.total() * 0.05 if order.customer.fidelity >= 1000 else 0
# 同一订单中,单个商品的数量达到20个或以上,享10%折扣
class BulkItemPromo(Promotion): # 第二个具体策略
"""单个商品为20个或以上时提供10%折扣"""
def discount(self, order):
discount = 0
for item in order.cart:
if item.quantity >= 20:
discount += item.total() * 0.1
return discount
# 订单中的不同商品达到10个或以上时提供7%折扣
class LargeOrderPromo(Promotion): # 第三个具体策略
"""订单中的不同商品达到10个或以上时提供7%折扣"""
def discount(self, order):
distinct_items = {item.product for item in order.cart} # set去重
if len(distinct_items) >= 10:
return order.total() * 0.07
return 0
# 1
joe = Customer('John Doe', 0)
ann = Customer('Ann Smith', 1100)
cart = [LineItem('banana', 4, 0.5), LineItem('apple', 10, 1.5), LineItem('watermellon', 5, 5.0)]
print(Order(joe, cart, FidelityPromo()))
print(Order(ann, cart, FidelityPromo()))
# 2
banana_cart = [LineItem('banana', 30, 0.5), LineItem('apple', 10, 1.5)]
print(Order(joe, banana_cart, BulkItemPromo()))
# 3
long_order = [LineItem(str(item_code), 1, 1.0) for item_code in range(10)]
print(Order(joe, long_order, LargeOrderPromo()))
# # 使用函数实现策略模式
# from collections import namedtuple
# Customer = namedtuple('Customer', 'name fidelity')
# class LineItem:
# def __init__(self, product, quantity, price):
# self.product = product
# self.quantity = quantity
# self.price = price
# def total(self):
# return self.price * self.quantity
# class Order: # 上下文
# def __init__(self, customer, cart, promotion=None):
# self.customer = customer
# self.cart = list(cart)
# self.promotion = promotion
# def total(self):
# if not hasattr(self, '__total'):
# self.__total = sum(item.total() for item in self.cart)
# return self.__total
# def due(self):
# if self.promotion is None:
# discount = 0
# else:
# discount = self.promotion(self)
# return self.total() - discount
# def __repr__(self):
# fmt = '<Order total: {:.2f} due: {:.2f}>'
# return fmt.format(self.total(), self.due())
#
# def fidelity_promo(order):
# """为积分为1000或以上的顾客提供5%折扣"""
# return order.total() * .05 if order.customer.fidelity >= 1000 else 0
# def bulk_item_promo(order):
# """单个商品为20个或以上时提供10%折扣"""
# discount = 0
# for item in order.cart:
# if item.quantity >= 20:
# discount += item.total() * .1
# return discount
# def large_order_promo(order):
# """订单中的不同商品达到10个或以上时提供7%折扣"""
# distinct_items = {item.product for item in order.cart}
# if len(distinct_items) >= 10:
# return order.total() * .07
# return 0
# globals返回一个字典表示当前的全局符号表。
# 这个符号表始终针对当前模块(对函数或方法来说,是指定义它们的模块,而不是调用它们的模块
# promos = [globals()[name] for name in globals()
# if name.endswith('_promo')
# and name != 'best_promo']
# def best_promo(order):
# """选择可用的最佳折扣
# """
# return max(promo(order) for promo in promos)
#.7.函数装饰器和闭包
from dis import dis
import time
from functools import lru_cache, wraps
import html
# 装饰器
def deco(func):
def inner():
print('running inner function')
func()
return inner
@deco
def target():
print('running target function')
# target() # 相当于调用deco(target)
# 何时执行装饰器
# 函数装饰器在导入模块立即执行,而被装饰函数只在明确调用时运行
registry = []
def register(func):
print('running register(%s)' % func)
registry.append(func)
return func
@register
def f1():
print('running f1()')
@register
def f2():
print('running f2()')
def f3():
print('running f3()')
def f4():
print('running f4()')
print('registry ->', registry)
f1()
f2()
f3()
f4()
# 变量作用域 global
# 闭包
def make_averager():
series = []
def averager(new_value):
series.append(new_value)
return sum(series)/len(series)
return averager
avg = make_averager()
print(avg(10))
print(avg(11))
print(avg(12))
print(avg.__code__.co_varnames) # 局部变量
print(avg.__code__.co_freevars) # 自由变量
# nonlocal声明(global是在函数内想对函数外的变量操作)
def make_averager1():
count = 0
total = 0
def averager(new_value):
nonlocal count, total # 声明非局部变量,自由变量
count += 1 # 非局部变量,保证对不可变类型重新赋值
total += new_value
return total / count
return averager
# 实现简单的装饰器,输出函数的运行时间
def clock(func):
@wraps(func) # functools的wrap,它能保留原有函数的名称和函数属性
def clocked(*args):
t0 = time.perf_counter() # 以秒为单位返回时间的浮点值
result = func(*args) # 调用函数,并赋值result
elapsed = time.perf_counter() - t0
name = func.__name__
arg_str = ', '.join(repr(arg) for arg in args)
print('[%0.8fs] %s(%s) -> %r' % (elapsed, name, arg_str, result))
return result
return clocked
@clock
def snooze(seconds):
time.sleep(seconds)
@clock
def factorial(n):
return 1 if n < 2 else n*factorial(n-1)
print('*' * 40, 'Calling snooze(123)')
snooze(0.123)
print('*' * 40, 'Calling factorial(6)')
print('6! =', factorial(6))
print(factorial.__name__)
print()
# 使用functools.lru_cache做备忘 Least Recently Used
# functools.lru_cache(maxsize=128, typed=False) maxsize参数指定存储多少个调用的结果
@lru_cache()
@clock
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-2) + fibonacci(n-1)
print(fibonacci(6))
# 单分派泛函数
def htmlize(obj):
content = html.escape(repr(obj))
return '<pre>{}</pre>'.format(content)
print(htmlize({1, 2, 3}))
print(htmlize(abs))
print(htmlize('Heimlich & Co.\n- a game'))
print(htmlize(42))
print(htmlize(['alpha', 66, {3, 2, 1}]))
print()
# 叠放装饰器
# @d1
# @d2
# def f():
# print('f')
# #等同于:
# def f():
# print('f')
# f = d1(d2(f))
# 参数化装饰器
registry1 = set()
def register1(active=True):
def decorate(func):
print('running register(active=%s)->decorate(%s)'% (active, func))
if active:
registry1.add(func)
else:
registry1.discard(func) # Remove an element from a set if it is a member
return func
return decorate
@register1(active=False)
def f1():
print('running f1()')
@register1()
def f2():
print('running f2()')
def f3():
print('running f3()')
f1()
print(registry1)
register1()(f3) # 对f3进行修饰
print(registry1)
#.8.对象引用、可变性和垃圾回收
from copy import copy, deepcopy
import weakref
# 为对象贴上标签
a = [1, 2, 3]
b = a
b.append(4)
a.append(5)
print(a, b)
# 相等和引用(别名) ==比较值 is比较对象
charles = {'name': 'Charles L. Dodgson', 'born': 1832, 'balance': 950}
alex = {'name': 'Charles L. Dodgson', 'born': 1832, 'balance': 950}
print(charles == alex)
print(charles is alex) # False
# 元组的相对不可变性
t1 = (1, 2, [30, 40])
t2 = (1, 2, [30, 40])
print(id(t1[-1]))
t1[-1].append(50)
print(id(t1[-1]))
# 默认做浅复制(copy)
l1 = [3, [66, 55, 44], (7, 8, 9)]
l2 = l1.copy() # 等同于l2 = list(l1)
l1.append(100)
l1[1].remove(55)
print('l1:', l1)
print('l2:', l2)
l2[1] += [33, 22] # +=运算符就地修改列表,可变对象会有影响
l2[2] += (10, 11) # 不可变对象,没有问题
print('l1:', l1)
print('l2:', l2)
# 深拷贝deepcopy
# 深复制有时可能太深了。对象可能会引用不该复制的外部资源或单例值
# 实现特殊方法__copy__()和__deepcopy__(),控制copy和deepcopy的行为
# 函数的参数作为引用时
def fun(a, b):
a += b
return a
a1 = 1
b1 = 2
print(fun(a1, b1))
print(a1, b1)
a2 = [1, 2]
b2 = [3, 4]
print(fun(a2, b2))
print(a2, b2)
a3 = (1, 2)
b3 = (3, 4)
print(fun(a3, b3))
print(a3, b3)
# 不要使用可变类型作为参数的默认值,建议使用None
class HauntedBus:
"""备受幽灵乘客折磨的校车"""
def __init__(self, passengers=[]):
self.passengers = passengers
def pick(self, name):
self.passengers.append(name)
def drop(self, name):
self.passengers.remove(name)
# 防御可变参数
class TwilightBus:
"""让乘客销声匿迹的校车"""
def __init__(self, passengers=None):
self.passengers = passengers # 传引用
# self.passengers = list(passengers) # 建议创建拷贝
def pick(self, name):
self.passengers.append(name)
def drop(self, name):
self.passengers.remove(name)
# del和垃圾回收
# 对象绝不会自行销毁;然而,无法得到对象时,可能会被当作垃圾回收。
# del语句删除名称,而不是对象
# 在CPython中,垃圾回收使用的主要算法是引用计数。
# 实际上,每个对象都会统计有多少引用指向自己。当引用计数归零时,对象立即就被销毁
s1 = {1, 2, 3}
s2 = s1
def bye():
print('Gone with the wind...')
ender = weakref.finalize(s1, bye)
print(ender.alive)
del s1
print(ender.alive)
s2 = 'spam' # {1, 2, 3}无法获取,对象被销毁
print(ender.alive)
# 弱引用
# 弱引用不会增加对象的引用数量。
# 经常用在缓存中
a_set = {0, 1}
wref = weakref.ref(a_set)
print(wref)
print(wref())
a_set = {2, 3, 4}
print(wref() is None)
# WeakValueDictionary
# 实现的是一种可变映射,里面的值是对象的弱引用
class Cheese:
def __init__(self, kind):
self.kind = kind
def __repr__(self):
return 'Cheese(%r)' % self.kind
stock = weakref.WeakValueDictionary()
catalog = [Cheese('Red Leicester'), Cheese('Tilsit'), Cheese('Brie'), Cheese('Parmesan')]
for cheese in catalog:
stock[cheese.kind] = cheese # 临时变量引用了对象,这可能会导致该变量的存在时间比预期长
print(sorted(stock.keys()))
del catalog
print(sorted(stock.keys()))
del cheese # for循环中的变量cheese是全局变量,除非显式删除,否则不会消失
print(sorted(stock.keys()))
# 弱引用的局限
# 基本的list和dict实例不能作为所指对象,但是它们的子类可以
# set实例可以作为所指对象
# int和tuple实例不能作为弱引用的目标,甚至它们的子类也不行
# Python对不可变类型施加的把戏
tuple1 = (1, 2, 3)
tuple2 = tuple(tuple1)
tuple3 = tuple1[:]
print(tuple2 is tuple1)
print(tuple3 is tuple1) # 对元组t来说,t[:]不创建副本,而是返回同一个对象的引用
# 驻留
# 驻留是Python解释器内部使用的一个特性
# 比较字符串或整数是否相等时,应该使用==,而不是is
string1 = 'abc'
string2 = 'abc'
int1 = 15
int2 = 15
print(string1 is string2)
print(int1 is int2)
#.9.符合python风格的对象
from array import array
from math import hypot
from datetime import datetime
# 对象表示形式
# repr() 以便于开发者理解的方式返回对象的字符串表示形式。(直接输出)
# str() 以便于用户理解的方式返回对象的字符串表示形式。
class Vector2d:
typecode = 'd' # typecode 是类属性,在Vector2d实例和字节序列之间转换时使用
def __init__(self, x, y):
self.__x = float(x) # 成员名称被改写_Vector2d__x
self.__y = float(y)
@property # @property装饰器把读值方法标记为特性
def x(self): # 读值方法与公开属性同名,都是x
return self.__x
@x.setter
def x(self, value): # 设置写属性
self.__x = value
@x.deleter
def x(self):
del self.__x
@property
def y(self):
return self.__y
def __iter__(self): # 实例对象可迭代
return (i for i in (self.x, self.y))
def __repr__(self):
class_name = type(self).__name__
# 可迭代的对象,所以*self会把x和y分量提供给format函数
return '{}({!r}, {!r})'.format(class_name, *self) # {!r}和%r类似
def __str__(self):
return str(tuple(self))
def __bytes__(self):
# 生成字节序列
return (bytes([ord(self.typecode)]) + bytes(array(self.typecode, self)))
def __eq__(self, other):
return tuple(self) == tuple(other)
def __abs__(self):
return hypot(self.x, self.y)
def __bool__(self):
return bool(abs(self))
def __format__(self, fmt_spec=''): # format格式化函数
components = (format(c, fmt_spec) for c in self)
return '({}, {})'.format(*components)
def __hash__(self): # 保证实例对象可散列
return hash(self.x) ^ hash(self.y) # 相等的对象应该具有相同的散列值
# 类方法,cls类本身
@classmethod
def frombytes(cls, octets):
typecode = chr(octets[0])
memv = memoryview(octets[1:]).cast(typecode)
return cls(*memv) # 构建了一个新实例
v1 = Vector2d(3, 4)
print(v1.x, v1.y)
x, y = v1
print(x, y)
print(v1)
v1_clone = eval(repr(v1))
print(v1 == v1_clone)
print(v1_clone)
print(bytes(v1))
print(abs(v1))
print(bool(v1), bool(Vector2d(0, 0)))
v2 = Vector2d(4, 5)
print(hash(v1), hash(v2))
print({v1, v2})
print()
# classmethod staticmethod
# classmethod 定义操作类,而不是操作实例的方法;最常见的用途是定义备选构造方法
# staticmethod 静态方法就是普通的函数,只是碰巧在类的定义体中,而不是在模块层定义
class Demo:
@classmethod
def klassmeth(*args):
return args
@staticmethod
def statmeth(*args):
return args
# 格式化显示
brl = 1/2.43
print(brl)
print(format(brl, '0.4f'))
print('1 BRL = {rate:0.2f} USD'.format(rate=brl))
print(format(42, 'b'))
print(format(2/3, '.1%'))
now = datetime.now()
print(format(now, '%H:%M:%S'))
print("It's now {:%I:%M %p}".format(now))
# 使用__slots__类属性节省空间
# 这个类中的所有实例属性
# 会在各个实例中使用类似元组的结构存储实例变量,从而避免使用消耗内存的 __dict__ 属性
# class Vector2d:
# __slots__ = ('__x', '__y')
# typecode = 'd'
# 每个子类都要定义 __slots__ 属性,因为解释器会忽略继承的 __slots__ 属性。
# 实例只能拥有 __slots__ 中列出的属性,除非把 '__dict__' 加入 __slots__ 中(这样做就失去了节省内存的功效)。
# 如果不把 '__weakref__' 加入 __slots__ ,实例就不能作为弱引用的目标。
# 类属性可用于为实例属性提供默认值
#.10.序列的修改、散列和切片
import numbers
from array import array
import reprlib
import math
from functools import reduce
import operator
from itertools import zip_longest
class Vector:
typecode = 'd'
shortcut_names = 'xyzt'
def __init__(self, components):
self._components = array(self.typecode, components) # 受保护的属性
def __iter__(self):
return iter(self._components) # 构建迭代器
def __repr__(self):
# reprlib.repr如果Vector实例的分量超过6个, repr()生成的字符串就会使用...省略一部分
components = reprlib.repr(self._components)
components = components[components.find('['):-1]
return 'Vector({})'.format(components)
def __str__(self):
return str(tuple(self))
def __bytes__(self):
return (bytes([ord(self.typecode)]) + bytes(self._components))
def __eq__(self, other): # 提高效率
if len(self) != len(other):
return False
for a, b in zip(self, other):
if a != b:
return False
return True
# 相同实现
# return len(self) == len(other) and all(a == b for a, b in zip(self, other))
def __hash__(self):
hashes = map(hash, self._components)
return reduce(operator.xor, hashes, 0) # 0作为初始值
def __abs__(self):
return math.sqrt(sum(x * x for x in self))
def __bool__(self):
return bool(abs(self))
def __len__(self):
return len(self._components) # 实现序列协议(可切片)
def __getitem__(self, index):
cls = type(self) # Vector类
if isinstance(index, slice): # 如果index是slice对象,切片
return cls(self._components[index])
elif isinstance(index, numbers.Integral):
return self._components[index]
else:
msg = '{cls.__name__} indices must be integers'
raise TypeError(msg.format(cls=cls))
def __getattr__(self, name):
cls = type(self)
if len(name) == 1:
pos = cls.shortcut_names.find(name)
if 0 <= pos < len(self._components):
return self._components[pos]
msg = '{.__name__!r} object has no attribute {!r}'
raise AttributeError(msg.format(cls, name))
def __setattr__(self, name, value): # 默认只读
cls = type(self)
if len(name) == 1:
if name in cls.shortcut_names:
error = 'readonly attribute {attr_name!r}'
elif name.islower():
error = "can't set attributes 'a' to 'z' in {cls_name!r}"
else:
error = ''
if error:
msg = error.format(cls_name=cls.__name__, attr_name=name)
raise AttributeError(msg)
super().__setattr__(name, value)
# 一元运算符
# - ( __neg__ )
# + ( __pos__ )
# ~ ( __invert__ )
# abs(__abs__)
# def __abs__(self):
# return math.sqrt(sum(x * x for x in self))
def __neg__(self):
return Vector(-x for x in self)
def __pos__(self): # Decimal 和 Counter不一定相等
return Vector(self)
# 二元运算符
# + ( __add__ )
# * ( __mul__ )
# == ( __eq__ )
# != ( __ne__ )
# += ( __iadd__ ) # 就地修改左操作数,而不会创建新对象
def __add__(self, other):
try:
pairs = zip_longest(self, other, fillvalue=0.0) # 生成器,fillvalue填充较短
return Vector(a + b for a, b in pairs)
except TypeError:
raise NotImplemented # 特殊单例值
def __radd__(self, other): # 反向版本
return self + other
def __mul__(self, scalar):
if isinstance(scalar, numbers.Real):
return Vector(n * scalar for n in self)
else:
return NotImplemented
def __rmul__(self, scalar):
return self * scalar
# def __eq__(self, other):
# return len(self) == len(other) and all(a == b for a, b in zip(self, other))
def __ne__(self, other):
eq_result = self == other
if eq_result is NotImplemented:
return NotImplemented
else:
return not eq_result
@classmethod
def frombytes(cls, octets):
typecode = chr(octets[0])
memv = memoryview(octets[1:]).cast(typecode) # 内存视图,类似对内存地址的直接访问;可以改变类型
return cls(memv) # 构建新实例
# 协议和鸭子类型
# Python的序列协议只需要 __len__ 和 __getitem__ 两个方法
# 动态存取属性
# __getattr__ __setattr__
a = Vector(range(5))
print(a.t)
# 散列和快速等值测试
# __eq__ __hash__
#.11.接口:从协议到抽象基类
from collections import namedtuple, MutableSequence
from random import choice, shuffle, SystemRandom, randrange
import abc
# 抽象基类(Abstract Base Class,ABC)
# 没有 __iter__ 和 __contains__ 方法,Python 会调用 __getitem__ 方法
class Foo:
def __getitem__(self, pos):
return range(0, 30, 10)[pos]
f = Foo()
for i in f:
print(i)
# 猴子补丁
Card = namedtuple('Card', ['rank', 'suit'])
class FrenchDesk:
ranks = [str(n) for n in range(2, 11)] + list('JQKA')
suits = 'spades diamonds clubs hearts'.split()
def __init__(self):
self._cards = [Card(rank, suit) for suit in self.suits
for rank in self.ranks]
def __len__(self):
return len(self._cards)
def __getitem__(self, pos):
return self._cards[pos]
def set_card(deck, position, card): # self、key 和 value
deck._cards[position] = card
deck = FrenchDesk()
FrenchDesk.__setitem__ = set_card
shuffle(deck)
print(deck[:5])
# 定义抽象基类的子类
class FrenchDesk2(MutableSequence):
ranks = [str(n) for n in range(2, 11)] + list('JQKA')
suits = 'spades diamonds clubs hearts'.split()
def __init__(self):
self._cards = [Card(rank, suit) for suit in self.suits
for rank in self.ranks]
def __len__(self):
return len(self._cards)
def __getitem__(self, pos):
return self._cards[pos]
def __setitem__(self, position, value): # MutableSequence抽象基类定义
self._cards[position] = value
def __delitem__(self, position): # MutableSequence抽象基类定义
del self._cards[position]
def insert(self, position, value): # MutableSequence抽象基类定义
self._cards.insert(position, value)
# 标准库的抽象基类
# Iterable 、 Container 和 Sized
# Sequence 、 Mapping 和 Set
# MappingView
# Callable 和 Hashable
# Iterator
# 抽象基类的数字塔
# Number Complex Real Rational Integral
class Tombola(abc.ABC):
@abc.abstractmethod
def load(self, iterable):
"""从可迭代对象中添加元素。"""
@abc.abstractmethod
def pick(self):
"""随机删除元素,然后将其返回。
如果实例为空,这个方法应该抛出`LookupError`。
"""
def loaded(self):
"""如果至少有一个元素,返回`True`,否则返回`False`。"""
return bool(self.inspect())
def inspect(self):
"""返回一个有序元组,由当前元素构成。"""
items = []
while True:
try:
items.append(self.pick())
except LookupError:
break
self.load(items)
return tuple(sorted(items))
class BingoCage(Tombola):
def __init__(self, items):
self._randomizer = SystemRandom() # 安全随机数
self._items = []
self.load(items)
def load(self, items):
self._items.extend(items)
self._randomizer.shuffle(self._items)
def pick(self):
try:
return self._items.pop()
except IndexError:
raise LookupError('pick from empty BingoCage')
def __call__(self):
self.pick()
class LotteryBlower(Tombola):
def __init__(self, iterable):
self._balls = list(iterable)
def load(self, iterable):
self._balls.extend(iterable)
def pick(self):
try:
position = randrange(len(self._balls))
except ValueError:
raise LookupError('pick from empty LotteryBlower')
return self._balls.pop(position)
def loaded(self):
return bool(self._balls)
def inspect(self):
return tuple(sorted(self._balls))
# 虚拟子类
# 注册虚拟子类的方式是在抽象基类上调用 register 方法
# issubclass 和 isinstance 等函数都能识别,但是注册的类不会从抽象基类中继承任何方法或属性
@Tombola.register
class TomboList(list):
def pick(self):
if self:
position = randrange(len(self))
return self.pop(position)
else:
raise LookupError('pop from empty TomboList')
load = list.extend
def loaded(self):
return bool(self)
def inspect(self):
return tuple(sorted(self))
#.12.继承的优缺点
from collections import UserDict
from numbers import Integral
# 直接子类化内置类型(如dict、list或str)容易出错
# 因为内置类型的方法通常会忽略用户覆盖的方法
# 该继承collections模块中的类,例如UserDict、UserList和UserString
# 这些类做了特殊设计,因此易于扩展
class DoppelDict(dict):
def __setitem__(self, key, value):
super().__setitem__(key, [value] * 2)
dd = DoppelDict(one=1)
dd['two'] = 2 # 调用__setitem__
dd.update(three=3)
print(dd)
class AnswerDict(dict):
def __getitem__(self, key):
return 42
ad = AnswerDict(a='foo')
print(ad, ad['a'])
ad_copy = {}
ad_copy.update(ad)
print(ad_copy, ad_copy['a'])
class DoppelDict2(UserDict): # UserList、UserDict或UserString
def __setitem__(self, key, value):
super().__setitem__(key, [value] * 2)
dd = DoppelDict2(one=1)
dd['two'] = 2
dd.update(three=3)
print(dd)
# 多重继承和方法解析顺序
class A:
def ping(self):
print('ping_a:', self)
class B(A):
def pong(self):
print('pong_b:', self)
class C(A):
def pong(self):
print('pong_c:', self)
class D(B, C): # __mro__先搜索B,再搜索C
def ping(self):
super().ping()
# A.ping(self)
print('ping_d_myself:', self)
def pingpong(self):
self.ping()
super().ping()
self.pong()
super().pong()
C.pong(self)
# 方法解析顺序(Method Resolution Order,MRO) 使用C3算法(课程表算法)
print(D.__mro__)
d = D()
# d.pong() # 调用的是B的pong方法
# d.ping()
# d.pingpong()
print(Integral.__mro__)
#.13.正确重载运算符
#.14.可迭代对象、迭代器和生成器
import re
import reprlib
from collections.abc import Iterable, Iterator, Generator
from fractions import Fraction
import itertools
import operator
RE_WORD = re.compile('\w+')
class Sentence: # 可迭代对象
def __init__(self, text):
self.text = text
self.words = RE_WORD.findall(text)
def __iter__(self): # 每次都实例化一个新的迭代器
return SentenceIterator(self.words)
def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)
# 不需要实现 __next__
# “支持多种遍历”,必须能从同一个可迭代的实例中获取多个独立的迭代器,而且各个迭代器要能维护自身的内部状态,
class SentenceIterator: # 迭代器
def __init__(self, words):
self.words = words
self.index = 0
def __next__(self): # 返回单个元素
try:
word = self.words[self.index]
except IndexError:
raise StopIteration()
self.index += 1
return word
def __iter__(self): # 返回迭代器本身
return self
class SentenceGenerator: # 生成器
def __init__(self, text):
self.text = text
self.words = RE_WORD.findall(text)
def __iter__(self):
for word in self.words:
yield word
def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)
class SentenceGeneratorInert: # 生成器惰性实现
def __init__(self, text):
self.text = text
def __iter__(self):
for match in RE_WORD.finditer(self.text): # 直接构建迭代器
yield match.group()
def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)
class SentenceGeneratorExpr: # 生成器表达式
def __init__(self, text):
self.text = text
def __iter__(self):
return (match.group() for match in RE_WORD.finditer(self.text)) # 生成器表达式
def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)
s = Sentence('"The time has come," the Walrus said,')
print(s)
print(iter(s))
for word in s:
print(word, end=' ')
print()
print(list(s))
# 解释器需要迭代对象 x 时,会自动调用 iter(x)
# (1) 检查对象是否实现了 __iter__ 方法,如果实现了就调用它,获取一个迭代器。
# (2) 如果没有实现 __iter__ 方法,但是实现了 __getitem__ 方法,Python 会创建一个迭代器,尝试按顺序(从索引 0 开始)获取元素。
# (3) 如果尝试失败,Python 抛出 TypeError 异常,通常会提示“C object is not iterable”(C对象不可迭代),其中 C 是目标对象所属的类。
# 可迭代的对象、迭代器、生成器
# 可迭代对象:如果对象实现了能返回迭代器的 __iter__方法,那么对象就是可迭代的。序列都可以迭代;实现了 __getitem__ 方法,而且其参数是从零开始的索引,这种对象也可以迭代
# 迭代器:包含__iter__方法和__next__方法,__iter__方法返回 self,__next__方法返回下一个可用的元素,如果没有元素了,抛出 StopIteration 异常
# 生成器: yield关键字
s = 'ABC'
for char in s:
print(char, end=' ')
print()
it = iter(s)
while True:
try:
print(next(it), end=' ')
except StopIteration:
del it
break
print()
def generator_test():
for x in range(5):
yield x
g = generator_test() # 函数对象 -> 生成器对象
print(next(g))
for i in g:
print(i, end=' ')
print()
# 等差数列生成器
class ArithmeticProgression:
def __init__(self, begin, step, end=None):
self.begin = begin
self.step = step
self.end = end # None -> 无穷数列
def __iter__(self):
result = type(self.begin + self.step)(self.begin)
forever = self.end is None # None生成的是无穷数列
index = 0 # 降低处理浮点数时累积效应致错的风险
while forever or result < self.end:
yield result
index += 1
result = self.begin + self.step * index
def aritprog_gen(begin, step, end=None):
result = type(begin + step)(begin)
forever = end is None
index = 0
while forever or result < end:
yield result
index += 1
result = begin + step * index
ap = ArithmeticProgression(0, 1, 3)
ap2 = ArithmeticProgression(0, Fraction(1, 3), 1)
print(list(ap2))
print(itertools.count(1, 0.5)) # itertools.count(start, step)
gen = itertools.takewhile(lambda n: n < 3, itertools.count(1, 0.5))
print(list(gen)) # takewhile指定条件位False时停止
print()
# 标准库中的生成器函数
# 过滤的生成器函数
# compress dropwhile filter fillterfalse islice takewhile
def vowel(c):
return c.lower() in 'aeiou'
def filter_gen():
print(list(filter(vowel, 'Aardvark')))
print(list(itertools.filterfalse(vowel, 'Aardvark'))) # 与filter相反
print(list(itertools.dropwhile(vowel, 'Aardvark'))) # 跳过真值,不进一步检查
print(list(itertools.takewhile(vowel, 'Aardvark'))) # 真值输出,假值停止
print(list(itertools.compress('Aardvark', (1,0,1,1,0,1)))) # 后面为真值产出前面元素
print(list(itertools.islice('Aardvark', 4))) # s[:stop]
print(list(itertools.islice('Aardvark', 4, 7))) # s[start:stop]
print(list(itertools.islice('Aardvark', 1, 7, 2))) # s[start:stop:step]
# filter_gen()
# 映射的生成器函数
# accumulate enumerate map starmap
sample = [5, 4, 2, 8, 7, 6, 3, 0, 9, 1]
def map_gen():
# accumulate是产出累积的总和;如果提供了 func ,那么把前两个元素传给它
# 然后把计算结果和下一个元素传给它,以此类推,最后产出结果
print(list(itertools.accumulate(sample)))
print(list(itertools.accumulate(sample, min)))
print(list(itertools.accumulate(sample, max)))
print(list(itertools.accumulate(sample, operator.mul)))
print(list(enumerate('albatroz', 1))) # enumerate(iterable, start=0),产出由两个元素组成的元组,结构是 (index, item)
print(list(map(operator.mul, range(11), range(11)))) # 把it中的各个元素传给func,产出结果;如果传入N个可迭代的对象,那么func必须能接受N个参数
print(list(itertools.starmap(operator.mul, enumerate('albatroz', 1)))) # 把it中的各个元素传给func,产出结果;输入的可迭代对象应该产出可迭代的元素iit,然后以func(*iit)这种形式调用func
# map_gen()
# 合并的生成器函数
# chain chain.from_iterable product zip zip_longest
def merge_gen():
print(list(itertools.chain('ABC', range(2)))) # 先产出 it1 中的所有元素,然后产出 it2 中的所有元素,以此类推,无缝连接在一起
print(list(itertools.chain(enumerate('ABC'))))
print(list(itertools.chain.from_iterable(enumerate('ABC')))) # 产出it生成的各个可迭代对象中的元素
print(list(zip('ABC', range(5)))) # 并行从输入的各个可迭代对象中获取元素,产出由 N 个元素组成的元组,只要有一个可迭代的对象到头了,就默默地停止
print(list(zip('ABC', range(5), [10, 20, 30, 40])))
print(list(itertools.zip_longest('ABC', range(5)))) # 等到最长的可迭代对象到头后才停止,空缺的值使用fillvalue 填充,默认为None
print(list(itertools.zip_longest('ABC', range(5), fillvalue='?')))
print(list(itertools.product('ABC', range(2)))) # 笛卡尔积
# merge_gen()
# 扩展的生成器函数
# combinations combinations_with_replacement count cycle permutations repeat
def ext_gen():
print(list(itertools.islice(itertools.count(1, 0.3), 3))) # 从 start 开始不断产出数字,按 step 指定的步幅增加
print(list(itertools.islice(itertools.cycle('ABC'), 7))) # 顺序重复不断产出
print(list(itertools.repeat(8, 4))) # 重复不断地产出指定的元素,除非提供 times ,指定次数
print(list(itertools.combinations('ABC', 2))) # 组合
print(list(itertools.combinations_with_replacement('ABC', 2))) # 组合,包含相同
print(list(itertools.permutations('ABC', 2))) # 排列
# ext_gen()
# 重新排列元素的生成器函数
# groupby reversed tee
animals = ['duck', 'eagle', 'rat', 'giraffe', 'bear', 'bat', 'dolphin', 'shark', 'lion']
animals.sort(key=len, reverse=True)
def rearrange_gen():
print(list(itertools.groupby('LLLLAAGGG'))) # 产出由两个元素组成的元素,形式为 (key, group) ,其中 key 是分组标准,group 是生成器,用于产出分组里的元素
for length, group in itertools.groupby(animals, len):
print(length, '->', list(group), end=' ')
print()
print(list(itertools.tee('ABC'))) # 产出一个由 n 个生成器组成的元组,每个生成器用于单独产出输入的可迭代对象中的元素
# rearrange_gen()
# yield from
# 生成器函数需要产出另一个生成器生成的值
def chain(*iterables):
for it in iterables:
for i in it:
yield i
def chain2(*iterables):
for i in iterables:
yield from i
s = 'ABC'
t = tuple(range(3))
print(list(chain2(s, t)))
# 可迭代的归约函数
# all any max min reduce sum
#.15.上下文管理器和else模块
# for/else while/else try/else
# for/else
# 仅当 for 循环运行完毕时(即 for 循环没有被 break 语句中止)才运行 else 块
# while/else
# 仅当 while 循环因为条件为假值而退出时(即 while 循环没有被 break 语句中止)才运行 else 块
# try/else
# 仅当 try 块中没有异常抛出时才运行 else 块
# try/except/else
# with简化try/finally
# __enter__ __exit__
# contextlib模块中的实用工具
# closing suppress @contextmanager ContextDecorator ExitStack
#.16.协程
from inspect import getgeneratorstate
from functools import wraps
from collections import namedtuple
# 协程可以处于四个状态的一个
# 'GEN_CREATED'
# 等待开始执行。
# 'GEN_RUNNING'
# 解释器正在执行。
# 'GEN_SUSPENDED'
# 在 yield 表达式处暂停。
# 'GEN_CLOSED'
# 执行结束。
def simple_coroutine():
for _ in range(5):
print('-> coroutine started')
x = yield
print('-> coroutine received:', x)
# my_coro = simple_coroutine()
# next(my_coro) # "预激"协程
# my_coro.send(42)
# my_coro.send(100)
def simple_coro2(a):
while True:
print('-> Started: a =', a)
b = yield a
print('-> Received: b =', b)
c = yield a + b
print('-> Received: c =', c)
# yield a 和 yield a + b 是作为返回值
# send是作为输入值
# my_coro2 = simple_coro2(14)
# print(getgeneratorstate(my_coro2))
# next(my_coro2)
# print(getgeneratorstate(my_coro2))
# my_coro2.send(28)
# my_coro2.send(99)
# print(getgeneratorstate(my_coro2))
# 计算移动平均值
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count
# coro_avg = averager()
# next(coro_avg)
# print(coro_avg.send(10))
# print(coro_avg.send(30))
# print(coro_avg.send(5))
# 预激协程的装饰器
# 不需要起初调用next方法
def coroutine(func):
"""装饰器:向前执行到第一个`yield`表达式,预激`func`"""
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
# 终止协程和异常处理
# generator.throw(exc_type[, exc_value[, traceback]])
# generator.close()
class DemoException(Exception):
"""为这次演示定义的异常类型。"""
def demo_exc_handling():
print('-> coroutine started')
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print('-> coroutine received: {!r}'.format(x))
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(11)
exc_coro.send(22)
# exc_coro.throw(DemoException)
exc_coro.close()
# 如果不管协程如何结束都想做些清理工作,要把协程定义体中相关的代码放入try/finally块中
# 让协程返回值
Result = namedtuple('Result', 'count average')
def averager2():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total/count
return Result(count, average)
coro_avg = averager2()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
result = Result(4, 5) # 给默认值
try:
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
print(result)
# 使用yield from
# 调用方 -> 委派生成器 -> 子生成器
def grouper(results, key): # 委派生成器
while True:
results[key] = yield from averager2()
def mainfun(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group)
for value in values:
group.send(value)
group.send(None) # 重要!
# print(results) # 如果要调试,去掉注释
report(results)
# 输出报告
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg': [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m': [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
mainfun(data)
#.17.使用期物处理并发
import os
import time
import sys
import requests
from concurrent import futures
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content
def show(text):
print(text, end=' ')
# sys.stdout.flush() # 刷新缓冲
def download_many(cc_list):
for cc in sorted(cc_list):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return len(cc_list)
def mainfun(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
# mainfun(download_many)
# 并发下载
MAX_WORKERS = 20
def download_one(cc):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '_futures.gif')
return cc
def download_many2(cc_list):
workers = min(MAX_WORKERS, len(cc_list)) # 最大线程数
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_one, sorted(cc_list))
return len(list(res))
# mainfun(download_many2)
# 期物测试
def download_many3(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do = []
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do.append(future)
msg = 'Scheduled for {}: {}'
print(msg.format(cc, future))
results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = '{} result: {!r}'
print(msg.format(future, res))
results.append(res)
return len(results)
# mainfun(download_many3)
# Python解析器:CPython(默认),PyPy,Psyco,JPython
# 阻塞型I/O和GIL
# CPython解释器本身就不是线程安全的,所以有全局解释器锁(GIL)
# 一次只允许使用一个线程执行Python字节码,因此Python进程通常不能同时使用多个CPU核心
# 使用多进程达到并发的效果,提升速率
#.18.使用asyncio包处理并发
import threading
import itertools
import time
import sys
class Signal:
go = True
def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
time.sleep(.1)
if not signal.go:
break
write(' ' * len(status) + '\x08' * len(status))
def slow_function():
# 假装等待I/O一段时间
time.sleep(3)
return 42
def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result
def mainfun():
result = supervisor()
print('Answer:', result)
mainfun()
#.19.动态属性和特性
# 属性attribute 数据的属性和处理数据的方法统称属性
# 特性property 在不改变类接口的前提下,使用存取方法(即读值方法和设值方法)修改数据属性
from collections import abc
import json
import keyword
import shelve
import warnings
JSON = 'testjs.json'
with open(JSON) as f:
feed = json.load(f)
# print(feed)
class FrozenJSON:
"""一个只读接口,使用属性表示法访问JSON类对象
"""
def __init__(self, mapping):
self.__data = dict()
for key, value in mapping.items():
if keyword.iskeyword(key):
key += '_'
self.__data[key] = value
def __getattr__(self, name):
if hasattr(self.__data, name):
return getattr(self.__data, name)
else:
return FrozenJSON.build(self.__data[name])
@classmethod
def build(cls, obj): # 备选构造
if isinstance(obj, abc.Mapping):
return cls(obj)
elif isinstance(obj, abc.MutableSequence):
return [cls.build(item) for item in obj]
else:
return obj
grad = FrozenJSON({'name': 'Jim Bo', 'class': 1982})
print(grad.class_)
# 使用__new__灵活创建对象
class FrozenJSON2:
"""一个只读接口,使用属性表示法访问JSON类对象
"""
def __new__(cls, arg):
if isinstance(arg, abc.Mapping):
return super().__new__(cls)
elif isinstance(arg, abc.MutableSequence):
return [cls(item) for item in arg]
else:
return arg
def __init__(self, mapping):
self.__data = {}
for key, value in mapping.items():
if keyword.iskeyword(key):
key += '_'
self.__data[key] = value
def __getattr__(self, name):
if hasattr(self.__data, name):
# getattr(object, name[, default])
# getattr(x, ‘a’) 等同于 x.a
# x对象是否包含属性a,否则返回AttributeError
return getattr(self.__data, name)
else:
return FrozenJSON2(self.__data[name])
grad2 = FrozenJSON2({'name': ['Jim Bo', 'a'], 'class': 1982})
print(grad2.name)
DB_NAME = 'schedule1_db'
CONFERENCE = 'conference.115'
class Record:
def __init__(self, **kwargs):
self.__dict__.update(kwargs) # 没有声明__slot__属性,实例创建一堆属性
def load_db(db):
raw_data = feed
warnings.warn('loading ' + DB_NAME)
for collection, rec_list in raw_data['Schedule'].items():
record_type = collection[:-1] # 去掉结尾s
for record in rec_list:
key = '{}.{}'.format(record_type, record['serial'])
record['serial'] = key
db[key] = Record(**record)
db = shelve.open(DB_NAME) # 新建字典存储
if CONFERENCE not in db:
load_db(db)
speaker = db['speaker.157509']
print(type(db))
if hasattr(speaker, 'name') and hasattr(speaker, 'twitter'):
print(speaker.name, speaker.twitter)
db.close()
# 特性会覆盖实例属性
class A:
data = 'the class data attr'
@property
def prop(self):
return 'the prop value'
a = A()
# 改变data 实例属性遮盖类的数据属性
print(vars(a)) # 等同于a.__dict__,查看a的属性和值
print(a.data)
a.data = 'bar'
print(a.__dict__) # {'data': 'bar'}
print(A.__dict__)
# 改变prop 实例属性不会遮盖类特性
print(a.prop)
# a.prop = 'foo' # 没有定义setter方法
a.__dict__['prop'] = 'foo'
print(a.prop)
print(a.__dict__)
A.prop = 'baz' # 覆盖 A.prop 特性,销毁特性对象
print(a.prop)
# 新添的类特性遮盖现有的实例属性
print(A.data)
print(a.data)
A.data = property(lambda self: 'the "data" prop value')
print(a.data)
del A.data
print(a.data)
# obj.attr 这样的表达式不会从 obj 开始寻找 attr ,而是从 obj.__class__ 开始,
# 而且,仅当类中没有名为 attr 的特性时,Python 才会在 obj 实例中寻找。
# 这条规则不仅适用于特性,还适用于一整类描述符——覆盖型描述符(overriding descriptor)
#.20.属性描述符
import collections
# 描述符是实现特定协议的类
# __get__ __set__ __delete__中实现一个或多个
# 用法:创建一个实例,作为另一个类的属性
class Quantity: # 描述符类
def __init__(self, storage_name):
self.storage_name = storage_name
# self是描述符实例,instance是托管类实例
def __set__(self, instance, value):
if value > 0:
# 必须直接使用__dict__方法,使用setattr会导致无限递归
# setattr(instance, self.storage_name, value) # error
instance.__dict__[self.storage_name] = value
else:
raise ValueError('value must be > 0')
class LineItem: # 托管类,把描述符实例声明为托管类的类属性
weight = Quantity('weight')
price = Quantity('price')
def __init__(self, description, weight, price):
self.description = description
self.weight = weight
self.price = price
def subtotal(self):
return self.weight * self.price
truffle = LineItem('White truffle', 100, 20)
class Quantity2: # 描述符类
__counter = 0
def __init__(self): # _Quantity#0
cls = self.__class__
prefix = cls.__name__ # Quantity2
index = cls.__counter
self.storage_name = '_{}#{}'.format(prefix, index)
cls.__counter += 1
# 可以使用内置的高阶函数 getattr 和 setattr 存取值,无需使用 instance.__dict__ ,
# 因为托管属性和储存属性的名称不同
def __get__(self, instance, owner): # owner 参数是托管类,LineItem2的引用
print(owner)
return getattr(instance, self.storage_name)
def __set__(self, instance, value):
if value > 0:
setattr(instance, self.storage_name, value)
else:
raise ValueError('value must be > 0')
class LineItem2: # 托管类,把描述符实例声明为托管类的类属性
weight = Quantity2()
price = Quantity2()
def __init__(self, description, weight, price):
self.description = description
self.weight = weight
self.price = price
def subtotal(self):
return self.weight * self.price
# truffle2 = LineItem2('White truffle', 100, 20)
# print(truffle2.weight)
# 覆盖型与非覆盖型描述符对比
def cls_name(obj_or_cls):
cls = type(obj_or_cls)
if cls is type:
cls = obj_or_cls
return cls.__name__.split('.')[-1]
def display(obj):
cls = type(obj)
if cls is type:
return '<class {}>'.format(obj.__name__)
elif cls in [type(None), int]:
return repr(obj)
else:
return '<{} object>'.format(cls_name(obj))
def print_args(name, *args):
pseudo_args = ', '.join(display(x) for x in args)
print('-> {}.__{}__({})'.format(cls_name(args[0]), name, pseudo_args))
### 对这个示例重要的类 ###
class Overriding:
"""也称数据描述符或强制描述符"""
def __get__(self, instance, owner):
print_args('get', self, instance, owner)
def __set__(self, instance, value):
print_args('set', self, instance, value)
class OverridingNoGet:
"""没有``__get__``方法的覆盖型描述符"""
def __set__(self, instance, value):
print_args('set', self, instance, value)
class OverridingNoSet:
"""没有``__set__``,也称非数据描述符或遮盖型描述符"""
def __get__(self, instance, owner):
print_args('get', self, instance, owner)
class Managed:
over = Overriding()
over_no_get = OverridingNoGet()
non_over = OverridingNoSet()
def spam(self):
print('-> Managed.spam({})'.format(display(self)))
# obj = Managed()
# print(obj.over)
# print(Managed.over)
# obj.over = 1
# obj.__dict__['over'] = 8
# print(vars(obj))
# print(obj.over)
# 方法是描述符
class Text(collections.UserString):
def __repr__(self):
return 'Text({!r})'.format(self.data)
def reverse(self):
return self[::-1]
# word = Text('forward')
# print(word)
# print(word.reverse())
# print(Text.reverse(Text('backward')))
# print(type(Text.reverse), type(word.reverse))
# print(list(map(Text.reverse, ['repaid', (10, 20, 30), Text('stressed')])))
# print(Text.reverse.__get__(word)) # 得到的是绑定方法
# print(Text.reverse.__get__(None, Text))
# print(word.reverse) # 相当于Text.reverse.__get__(word)
# print(word.reverse.__self__)
# print(word.reverse.__func__ is Text.reverse)
# 描述符用法建议
# 使用特性以保持简单;只读描述符必须有 __set__ 方法;
# 用于验证的描述符可以只有 __set__ 方法;仅有 __get__ 方法的描述符可以实现高效缓存
# 非特殊的方法可以被实例属性遮盖