Python 迭代器是怎么實現的?
楔子
只要類型對象實現了 __iter__,那么它的實例對象就被稱為可迭代對象(Iterable),比如字符串、元組、列表、字典、集合等等。而整數、浮點數,由于其類型對象沒有實現 __iter__,所以它們不是可迭代對象。
from typing import Iterable
print(
isinstance("", Iterable),
isinstance((), Iterable),
isinstance([], Iterable),
isinstance({}, Iterable),
isinstance(set(), Iterable),
) # True True True True True
print(
isinstance(0, Iterable),
isinstance(0.0, Iterable),
) # False False
可迭代對象的一大特點是它可以被 for 循環遍歷,但能被 for 循環遍歷的則不一定是可迭代對象。我們舉個例子:
class A:
def __getitem__(self, item):
return f"參數 item: {item}"
a = A()
# 內部定義了 __getitem__
# 首先可以讓實例對象像字典一樣訪問屬性
print(a["name"]) # 參數 item: name
print(a["satori"]) # 參數 item: satori
# 此外還可以像可迭代對象一樣被 for 循環
# 循環的時候會自動給 item 傳值:0 1 2 3 ...
# 如果內部出現了 StopIteration,循環結束
# 否則會一直循環下去,這里我們手動 break
for idx, val in enumerate(a):
print(val)
if idx == 5:
break
"""
參數 item: 0
參數 item: 1
參數 item: 2
參數 item: 3
參數 item: 4
參數 item: 5
"""
所以實現了 __getitem__ 的類的實例,也是可以被 for 循環的,但它并不是可迭代對象。
from typing import Iterable
print(isinstance(a, Iterable)) # False
總之判斷一個對象是否是可迭代對象,就看它的類型對象有沒有實現 __iter__。可迭代對象我們知道了,那什么是迭代器呢?很簡單,調用可迭代對象的 __iter__ 方法,得到的就是迭代器。
迭代器的創建
不同類型的對象,都有自己的迭代器,舉個栗子。
data = [1, 2, 3]
# 底層調用的其實是 list.__iter__(data)
# 或者說 PyList_Type.tp_iter(data)
it = data.__iter__()
print(it)
"""
<list_iterator object at 0x102c1cf10>
"""
print(str.__iter__(""))
"""
<str_iterator object at 0x100e623b0>
"""
print(tuple.__iter__(()))
"""
<tuple_iterator object at 0x100e623b0>
"""
# 不難發現,迭代器的種類有非常多
# 比如 list_iterator、str_iterator、tuple_iterator 等等
迭代器也是可迭代對象,只不過迭代器內部的 __iter__ 返回的還是它本身。當然啦,在創建迭代器的時候,我們更常用內置函數 iter。
data = [1, 2, 3]
# 等價于 type(data).__iter__(data)
it = iter(data)
但是 iter 函數還有一個鮮為人知的用法,我們來看一下:
val = 0
def foo():
global val
val += 1
return val
# iter 可以接收一個參數: iter(可迭代對象)
# iter 也可以接收兩個參數: iter(可調用對象, value)
for i in iter(foo, 5):
print(i)
"""
1
2
3
4
"""
進行迭代的時候,會不停地調用可調用對象,直到返回值等于傳遞的第二個參數 value(在底層被稱為哨兵),然后終止迭代。我們看一下 iter 函數的底層實現。
// Python/clinic/bltinmodule.c.h
static PyObject *
builtin_iter(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
{
PyObject *return_value = NULL;
PyObject *object;
PyObject *sentinel = NULL;
// 內置函數 iter 接收 1 ~ 2 個參數
if (!_PyArg_CheckPositional("iter", nargs, 1, 2)) {
goto exit;
}
// 如果 nargs 小于 2,那么 args[0] 是可迭代對象
// 如果 nargs 等于 2,那么 args[0] 是可調用對象
object = args[0];
if (nargs < 2) {
goto skip_optional;
}
sentinel = args[1];
skip_optional:
// 具體實現由 builtin_iter_impl 負責
// 它會調用可迭代對象的 __iter__ 方法,返回迭代器
return_value = builtin_iter_impl(module, object, sentinel);
exit:
return return_value;
}
// Python/bltinmodule.c
static PyObject *
builtin_iter_impl(PyObject *module, PyObject *object, PyObject *sentinel)
{
// 如果哨兵為空,說明只傳了一個參數,那么該參數應該是可迭代對象
if (sentinel == NULL)
// 調用 PyObject_GetIter 獲取對象的迭代器
return PyObject_GetIter(object);
// 如果哨兵不為空,那么第一個參數應該是可調用對象
// 這里進行檢測,如果不是,拋出 TypeError
if (!PyCallable_Check(object)) {
PyErr_SetString(PyExc_TypeError,
"iter(object, sentinel): object must be callable");
return NULL;
}
// 一會兒單獨解釋
return PyCallIter_New(object, sentinel);
}
以上就是 iter 函數的內部邏輯,既可以接收一個參數,也可以接收兩個參數。這里我們只看接收一個可迭代對象的情況,所以核心就在 PyObject_GetIter 函數里面,它是根據可迭代對象生成迭代器的關鍵,我們來看一下它的邏輯是怎么樣的?
// Objects/abstract.c
PyObject *
PyObject_GetIter(PyObject *o)
{
// 獲取可迭代對象的類型對象,比如 o 是列表,那么 t 就是 list
PyTypeObject *t = Py_TYPE(o);
// 我們說類型對象定義的操作,決定了實例對象的行為
// 實例對象調用的那些方法都是定義在類型對象里面的
// 還是那句話:obj.func() 等價于 type(obj).func(obj)
getiterfunc f;
// 所以這里是獲取類型對象的 tp_iter 字段
// 也就是 Python 中的 __iter__
f = t->tp_iter;
// 如果 f 為 NULL,說明類型對象的內部沒有定義 __iter__
// 像 str、tuple、list 等類型對象,它們的 tp_iter 字段都是不為 NULL 的
if (f == NULL) {
// 如果 tp_iter 為 NULL,那么解釋器會退而求其次
// 檢測該類型對象中是否定義了 __getitem__
// 如果定義了,那么直接調用 PySeqIter_New,創建 seqiterobject 對象
// 下面的 PySequence_Check 函數負責檢測類型對象是否實現了 __getitem__
// __getitem__ 對應 tp_as_sequence->sq_item
if (PySequence_Check(o))
return PySeqIter_New(o);
// 走到這里說明該類型對象既沒有 __iter__、也沒有 __getitem__
// 因此它的實例對象不具備可迭代的性質,于是拋出異常
return type_error("'%.200s' object is not iterable", o);
}
else {
// 否則說明定義了 __iter__,于是直接進行調用
// Py_TYPE(o)->tp_iter(o) 返回對應的迭代器
PyObject *res = (*f)(o);
// 但如果返回值 res 不為 NULL、并且還不是迭代器
// 證明 __iter__ 的返回值有問題,于是拋出異常
if (res != NULL && !PyIter_Check(res)) {
PyErr_Format(PyExc_TypeError,
"iter() returned non-iterator "
"of type '%.100s'",
Py_TYPE(res)->tp_name);
Py_SETREF(res, NULL);
}
// 返回 res
return res;
}
}
以上便是 iter 函數的底層實現,還是很簡單的。然后是里面的 __getitem__,我們說如果類型對象內部沒有定義 __iter__,那么解釋器會退而求其次,檢測內部是否定義了 __getitem__。
因此以上就是迭代器的創建過程,每個可迭代對象都有自己的迭代器,而迭代器本質上就是對原始數據的一層封裝罷了。
迭代器的底層結構
由于迭代器的種類非常多,字符串、元組、列表等等,都有自己的迭代器,這里就不一一介紹了。我們就以列表的迭代器為例,看看迭代器在底層的結構是怎么樣的。
// Objects/listobject.c
// 列表迭代器的類型對象為 <class 'list_iterator'>
// 但這個類,解釋器并沒有暴露給我們,所以需要通過 type 獲取
// 然后它的 tp_basicsize 字段為 sizeof(_PyListIterObject)
// 這就說明列表迭代器在底層由 _PyListIterObject 結構體表示
PyTypeObject PyListIter_Type = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"list_iterator", /* tp_name */
sizeof(_PyListIterObject), /* tp_basicsize */
0, /* tp_itemsize */
// ...
};
// Include/internal/pycore_list.h
typedef struct {
PyObject_HEAD
Py_ssize_t it_index;
// 指向創建該迭代器的列表
PyListObject *it_seq;
} _PyListIterObject;
所以迭代器就是基于可迭代對象進行了一層簡單的封裝,所謂元素迭代本質上還是基于索引,并且每迭代一次,索引就自增 1。一旦出現索引越界,就將 it_seq 設置為 NULL,表示迭代器迭代完畢。
我們實際演示一下:
from ctypes import *
class PyObject(Structure):
_fields_ = [
("ob_refcnt", c_ssize_t),
("ob_size", c_void_p)
]
class ListIterObject(PyObject):
_fields_ = [
("it_index", c_ssize_t),
("it_seq", POINTER(PyObject))
]
it = iter([1, 2, 3])
it_obj = ListIterObject.from_address(id(it))
# it_seq 指向列表 [1, 2, 3],it_index 初始為 0
print(it_obj.it_index) # 0
# 進行迭代
next(it)
# 索引自增 1,此時 it_index 等于 1
print(it_obj.it_index) # 1
# 再次迭代
next(it)
# 此時 it_index 等于 2
print(it_obj.it_index) # 2
# 再次迭代
next(it)
# 此時 it_index 等于 3
print(it_obj.it_index) # 3
當 it_index 為 3 的時候,如果再次迭代,那么底層會發現 it_index 已超過最大索引,于是知道迭代器已經迭代完畢了。因此會將 it_seq 設置為 NULL,并拋出 StopIteration。如果是 for 循環,那么會自動捕獲此異常,然后停止循環。
所以這就是迭代器,真的沒有想象中的那么神秘,甚至在知道它的實現原理之后,還覺得有點 low,因為就是將原始數據包了一層,加了一個索引而已。所謂的迭代仍然是基于索引來做的,并且每迭代一次,索引就自增 1。當索引超出范圍時,證明迭代完畢了,于是將 it_seq 字段設置為 NULL,拋出 StopIteration。
迭代器是怎么迭代元素的
迭代器的創建我們知道了,那么它是怎么迭代元素的呢?首先迭代元素可以通過 next 函數,當然它本質上也是調用了對象的 __next__ 方法。
// Python/clinic/bltinmodule.c.h
static PyObject *
builtin_next(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
{
PyObject *return_value = NULL;
PyObject *iterator;
PyObject *default_value = NULL;
// 同樣接收 1 ~ 2 個參數
// 因為調用 next 函數時,可以傳入一個默認值
// 表示當迭代器沒有元素可以迭代的時候,會返回指定的默認值
if (!_PyArg_CheckPositional("next", nargs, 1, 2)) {
goto exit;
}
// 迭代器
iterator = args[0];
if (nargs < 2) {
goto skip_optional;
}
// 默認值
default_value = args[1];
skip_optional:
return_value = builtin_next_impl(module, iterator, default_value);
exit:
return return_value;
}
// Python/bltinmodule.c
static PyObject *
builtin_next_impl(PyObject *module, PyObject *iterator,
PyObject *default_value)
{
PyObject *res;
// 第一個參數必須是迭代器,否則拋出 TypeError
if (!PyIter_Check(iterator)) {
PyErr_Format(PyExc_TypeError,
"'%.200s' object is not an iterator",
Py_TYPE(iterator)->tp_name);
return NULL;
}
// Py_TYPE(iterator) 表示獲取類型對象,也就是該迭代器的類型
// 當然具體類型是哪一種并不確定,可能是列表迭代器、元組迭代器、字符串迭代器等等
// 然后再獲取 tp_iternext 字段,相當于 __next__
// 拿到函數指針之后,傳入迭代器進行調用
res = (*Py_TYPE(iterator)->tp_iternext)(iterator);
// 如果 res 不為 NULL, 那么證明迭代到值了, 直接返回
if (res != NULL) {
return res;
} else if (default_value != NULL) {
// 否則的話,說明沒有迭代到值(返回 NULL),那么這時候有兩種情況
// 1)迭代器已耗盡,2)在迭代過程中出現異常
// 那么判斷 default_value,如果不為 NULL,說明設置了默認值
if (PyErr_Occurred()) {
// 檢測異常是不是迭代完畢時(或者手動 raise)產生的 StopIteration 異常
if(!PyErr_ExceptionMatches(PyExc_StopIteration))
// 如果不是,說明程序的邏輯有問題,直接 return NULL,結束執行
// 然后在 Python 里面我們會看到打印到 stderr 中的異常信息
return NULL;
// 如果異常是 StopIteration,證明迭代完畢了
// 但我們設置了默認值,那么就應該返回默認值
// 而不應該拋出 StopIteration,于是將異常回溯棧給清空
PyErr_Clear();
}
// 增加 default_value 的引用計數,然后返回
return Py_NewRef(default_value);
} else if (PyErr_Occurred()) {
// 走到這里說明 res == NULL,并且沒有指定默認值
// 那么當發生異常時,將異常直接拋出
return NULL;
} else {
// 都不是的話,直接拋出 StopIteration
PyErr_SetNone(PyExc_StopIteration);
return NULL;
}
}
以上就是 next 函數的背后邏輯,實際上還是調用了迭代器的 __next__ 方法。
data = [1, 2, 3]
it = iter(data)
# 然后迭代,等價于 next(it)
print(type(it).__next__(it)) # 1
print(type(it).__next__(it)) # 2
print(type(it).__next__(it)) # 3
# 但是 next 可以指定默認值
# 如果不指定默認值,或者還是 type(it).__next__(it)
# 那么就會報錯,拋出 StopIteration
print(next(it, 666)) # 666
以上就是元素的迭代,由于內置函數 next 還可以指定一個默認值,所以更強大一些。當然在不指定默認值的情況下,next(it) 和 type(it).__next__(it) 最終是殊途同歸的。
我們仍以列表的迭代器為例,看看 __next__ 的具體實現。但是要想找到具體實現,首先要找到它的類型對象。
我們看到 tp_iternext 字段指向了 listiter_next,證明迭代的時候調用的是這個函數。
// Objects/listobject.c
static PyObject *
listiter_next(_PyListIterObject *it)
{
// 迭代器只是對可迭代對象的一層封裝
// 如果是列表的迭代器,那么內部的 it_seq 字段便指向列表
PyListObject *seq;
PyObject *item;
assert(it != NULL);
// 如果 it->it_seq 等于 NULL,說明迭代器已經迭代完畢了
// 從這里也能看出迭代器不能二次循環迭代
seq = it->it_seq;
if (seq == NULL)
return NULL;
assert(PyList_Check(seq));
// 如果 it->it_index 小于列表的長度
if (it->it_index < PyList_GET_SIZE(seq)) {
// 那么獲取元素
item = PyList_GET_ITEM(seq, it->it_index);
// it_index 自增 1
++it->it_index;
// 增加元素的引用計數,并返回
return Py_NewRef(item);
}
// 否則說明 it_index 已經達到了列表的長度
// 再迭代就索引越界了,而對于迭代器來說
// 當 it_index 等于列表長度時,就證明所有元素都迭代完畢了
it->it_seq = NULL; // 將 it_seq 設置為 NULL
Py_DECREF(seq);
return NULL;
}
顯然這和之前分析的是一樣的,以上我們就以列表為例,考察了迭代器的實現原理和元素迭代的具體過程。當然其它對象也有自己的迭代器,有興趣可以自己看一看,實現方式都大同小異。
小結
通過探究迭代器,我們再次體會到了 Python 的設計哲學,雖然一切皆對象,但是拿到的都是對象的指針。像變量、函數參數等,它們存儲的都不是對象本身,而是對象的泛型指針。而基于 PyObject * 和 ob_type,Python 巧妙地實現了多態。
不管變量 obj 指向什么樣的可迭代對象,都可以交給 iter 函數,會調用類型對象內部的 __iter__(底層對應 tp_iter 字段),得到迭代器。不管變量 it 指向什么樣的迭代器,都可以交給 next 函數進行迭代,會調用迭代器的類型對象的 __next__(底層對應 tp_iternext 字段),將值迭代出來。
至于 __iter__ 和 __next__ 本身,每個迭代器都會有,我們這里只以列表的迭代器為例。所以這是不是實現了多態呢?
這就是 Python 的設計哲學,變量只是一個指針,傳遞變量的時候相當于傳遞指針(將指針拷貝一份),但是操作一個變量的時候會自動操作變量(指針)指向的內存。
對了,我們說 iter 函數如果接收兩個參數,那么第一個參數要是 callable,第二個參數是哨兵。迭代時會調用 callable,當返回值等于哨兵時,迭代結束,那么它的底層是怎么實現的呢?這里簡單補充一下。
// Python/bltinmodule.c
static PyObject *
builtin_iter_impl(PyObject *module, PyObject *object, PyObject *sentinel)
{
if (sentinel == NULL)
return PyObject_GetIter(object);
if (!PyCallable_Check(object)) {
PyErr_SetString(PyExc_TypeError,
"iter(object, sentinel): object must be callable");
return NULL;
}
// 如果 sentinel 不等于 NULL,會調用 PyCallIter_New
return PyCallIter_New(object, sentinel);
}
// Objects/iterobject.c
typedef struct {
PyObject_HEAD
PyObject *it_callable;
PyObject *it_sentinel;
} calliterobject;
PyObject *
PyCallIter_New(PyObject *callable, PyObject *sentinel)
{
// iter(callable, value) 會返回一個 <class 'callable_iterator'> 實例
// 在底層由 calliterobject 結構體實現
calliterobject *it;
// 為 calliterobject 實例申請內存
it = PyObject_GC_New(calliterobject, &PyCallIter_Type);
if (it == NULL)
return NULL;
// 初始化字段
it->it_callable = Py_NewRef(callable);
it->it_sentinel = Py_NewRef(sentinel);
_PyObject_GC_TRACK(it);
return (PyObject *)it;
}
// 再來看看迭代過程
static PyObject *
calliter_iternext(calliterobject *it)
{
PyObject *result;
// 如果 it_callable 字段為空,說明迭代結束,不能再次迭代
if (it->it_callable == NULL) {
return NULL;
}
// 調用 it_callable,拿到返回值 result
result = _PyObject_CallNoArgs(it->it_callable);
if (result != NULL && it->it_sentinel != NULL){
int ok;
// 如果 result 和哨兵相等,那么 ok == 1,否則 ok == 0
ok = PyObject_RichCompareBool(it->it_sentinel, result, Py_EQ);
// 如果 ok == 0,說明兩者不相等,那么返回 result
if (ok == 0) {
return result;
}
// 如果返回值和哨兵相等,那么迭代結束
// 減少引用計數,并將 it_callable 和 it_sentinel 字段設置為 NULL
if (ok > 0) {
Py_CLEAR(it->it_callable);
Py_CLEAR(it->it_sentinel);
}
}
else if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
// 如果函數拋出了 StopIteration 異常,同樣視為迭代結束
PyErr_Clear();
Py_CLEAR(it->it_callable);
Py_CLEAR(it->it_sentinel);
}
Py_XDECREF(result);
return NULL;
}
以上就是 Python 迭代器的相關內容,當然我們也完全可以自己封裝一個迭代器,有興趣可以試一下。