iter() - 迭代器创建函数
创建迭代器对象的内置函数,支持多种迭代模式
iter() - 迭代器创建函数
📝 概述
iter() 是 Python 中用于创建迭代器对象的内置函数。它可以将可迭代对象转换为迭代器,也可以通过调用函数和哨兵值创建迭代器。迭代器是 Python 中实现惰性求值和内存高效数据处理的核心机制,广泛应用于循环、数据流处理和函数式编程中。
🎯 学习目标
- 掌握 iter()函数的两种调用形式和用法
- 理解迭代器协议和迭代器的工作原理
- 学会使用 iter()创建自定义迭代器
- 了解迭代器在内存优化和数据流处理中的应用
- 掌握迭代器的高级用法和最佳实践
📋 前置知识
- Python 基础语法和数据类型
- 可迭代对象和迭代器的概念
- 生成器和 yield 关键字
- 函数式编程基础
- 异常处理机制
🔍 详细内容
基本概念
iter() 函数有两种调用形式:
iter(iterable)- 从可迭代对象创建迭代器iter(callable, sentinel)- 从可调用对象和哨兵值创建迭代器
迭代器是实现了迭代器协议的对象,必须实现 __iter__() 和 __next__() 方法。
语法格式
## 形式 1:从可迭代对象创建迭代器
iter(iterable)
## 形式 2:从可调用对象创建迭代器
iter(callable, sentinel)
参数说明
| 参数名 | 类型 | 必需 | 说明 |
|---|---|---|---|
| iterable | 可迭代对象 | 是 | 任何实现了__iter__()或__getitem__()的对象 |
| callable | 可调用对象 | 是 | 无参数的函数或方法 |
| sentinel | 任意类型 | 是 | 哨兵值,当 callable 返回此值时停止迭代 |
返回值
- 返回类型: iterator 对象
- 特性: 实现了迭代器协议,支持 next()函数调用
- 状态: 有状态的,记住当前迭代位置
💡 实际应用
基础迭代器操作
## 基础迭代器操作示例
print("基础迭代器操作示例:")
## 1. 从可迭代对象创建迭代器
print("\n1. 从可迭代对象创建迭代器:")
## 列表迭代器
test_list = [1, 2, 3, 4, 5]
list_iter = iter(test_list)
print(f"原始列表: {test_list}")
print(f"迭代器类型: {type(list_iter)}")
print(f"迭代器对象: {list_iter}")
## 手动迭代
print("\n 手动迭代:")
try:
while True:
value = next(list_iter)
print(f" 下一个值: {value}")
except StopIteration:
print(" 迭代结束")
## 字符串迭代器
test_string = "Hello"
string_iter = iter(test_string)
print(f"\n 字符串: '{test_string}'")
print(f"字符迭代: {list(string_iter)}")
## 字典迭代器
test_dict = {'a': 1, 'b': 2, 'c': 3}
dict_iter = iter(test_dict) # 默认迭代键
print(f"\n 字典: {test_dict}")
print(f"键迭代: {list(dict_iter)}")
## 字典值和项的迭代器
values_iter = iter(test_dict.values())
items_iter = iter(test_dict.items())
print(f"值迭代: {list(values_iter)}")
print(f"项迭代: {list(items_iter)}")
## 范围迭代器
range_iter = iter(range(3, 8))
print(f"\n 范围迭代: {list(range_iter)}")
## 集合迭代器
test_set = {10, 20, 30, 40}
set_iter = iter(test_set)
print(f"集合: {test_set}")
print(f"集合迭代: {list(set_iter)}")
## 2. 迭代器的特性
print("\n2. 迭代器的特性:")
## 迭代器是一次性的
test_data = [1, 2, 3]
iterator = iter(test_data)
print(f"原始数据: {test_data}")
print(f"第一次迭代: {list(iterator)}")
print(f"第二次迭代: {list(iterator)}")
print("注意: 迭代器是一次性的,第二次迭代为空")
## 迭代器自身也是可迭代的
iterator2 = iter(test_data)
iterator3 = iter(iterator2) # 迭代器的迭代器就是自身
print(f"\niterator2 is iterator3: {iterator2 is iterator3}")
## 迭代器状态
iterator4 = iter([10, 20, 30, 40, 50])
print(f"\n 迭代器状态演示:")
print(f"第 1 个元素: {next(iterator4)}")
print(f"第 2 个元素: {next(iterator4)}")
print(f"剩余元素: {list(iterator4)}")
## 3. 使用 iter()的不同方式
print("\n3. 使用 iter()的不同方式:")
## 从生成器表达式创建迭代器
gen_expr = (x**2 for x in range(5))
gen_iter = iter(gen_expr)
print(f"生成器表达式迭代: {list(gen_iter)}")
## 从文件对象创建迭代器(模拟)
class MockFile:
def __init__(self, lines):
self.lines = lines
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.lines):
raise StopIteration
line = self.lines[self.index]
self.index += 1
return line
mock_file = MockFile(["line 1\n", "line 2\n", "line 3\n"])
file_iter = iter(mock_file)
print(f"\n 文件行迭代:")
for line in file_iter:
print(f" {line.strip()}")
## 从自定义可迭代对象创建迭代器
class CountDown:
def __init__(self, start):
self.start = start
def __iter__(self):
return CountDownIterator(self.start)
class CountDownIterator:
def __init__(self, start):
self.current = start
def __iter__(self):
return self
def __next__(self):
if self.current <= 0:
raise StopIteration
self.current -= 1
return self.current + 1
countdown = CountDown(5)
countdown_iter = iter(countdown)
print(f"\n 倒计时迭代: {list(countdown_iter)}")
## 4. 迭代器协议验证
print("\n4. 迭代器协议验证:")
def check_iterator_protocol(obj):
"""检查对象是否实现了迭代器协议"""
has_iter = hasattr(obj, '__iter__')
has_next = hasattr(obj, '__next__')
print(f"对象: {obj}")
print(f"类型: {type(obj).__name__}")
print(f"有__iter__方法: {has_iter}")
print(f"有__next__方法: {has_next}")
print(f"是迭代器: {has_iter and has_next}")
print(f"是可迭代对象: {has_iter}")
print()
## 测试不同对象
test_objects = [
[1, 2, 3], # 列表(可迭代,非迭代器)
iter([1, 2, 3]), # 列表迭代器
"hello", # 字符串(可迭代,非迭代器)
iter("hello"), # 字符串迭代器
range(3), # 范围对象(可迭代,非迭代器)
iter(range(3)), # 范围迭代器
]
for obj in test_objects:
check_iterator_protocol(obj)
可调用对象迭代器
## 可调用对象迭代器应用
print("可调用对象迭代器应用:")
## 1. 基础可调用对象迭代器
print("\n1. 基础可调用对象迭代器:")
import random
import time
## 随机数生成器
def random_number():
"""生成 1-10 的随机数"""
return random.randint(1, 10)
## 创建迭代器,当生成 5 时停止
random_iter = iter(random_number, 5)
print("生成随机数直到遇到 5:")
## 设置随机种子以便复现
random.seed(42)
results = []
for i, num in enumerate(random_iter):
results.append(num)
print(f" 第{i+1}次: {num}")
if i >= 10: # 防止无限循环
print(" (为演示目的,限制最多 11 次)")
break
print(f"生成的数字: {results}")
## 2. 文件读取迭代器
print("\n2. 文件读取迭代器模拟:")
class MockFileReader:
"""模拟文件读取器"""
def __init__(self, content):
self.lines = content.split('\n')
self.index = 0
def read_line(self):
"""读取一行,如果到达文件末尾返回空字符串"""
if self.index >= len(self.lines):
return ''
line = self.lines[self.index]
self.index += 1
return line
## 创建模拟文件
file_content = """第一行内容
第二行内容
第三行内容
最后一行"""
file_reader = MockFileReader(file_content)
## 使用 iter()读取文件直到遇到空字符串
file_iter = iter(file_reader.read_line, '')
print("逐行读取文件:")
for line_num, line in enumerate(file_iter, 1):
print(f" 行{line_num}: {line}")
## 3. 传感器数据读取
print("\n3. 传感器数据读取模拟:")
class TemperatureSensor:
"""模拟温度传感器"""
def __init__(self):
self.base_temp = 20.0
self.readings = 0
def read_temperature(self):
"""读取温度,模拟传感器故障返回-999"""
self.readings += 1
# # 模拟传感器在第 8 次读取时故障
if self.readings == 8:
return -999 # 错误代码
# # 模拟温度波动
variation = random.uniform(-2, 2)
return round(self.base_temp + variation, 1)
sensor = TemperatureSensor()
random.seed(123) # 设置种子以便复现
## 读取温度直到遇到错误代码
temp_iter = iter(sensor.read_temperature, -999)
print("读取温度数据直到传感器故障:")
temperatures = []
for reading_num, temp in enumerate(temp_iter, 1):
temperatures.append(temp)
print(f" 读取{reading_num}: {temp}°C")
print(f"有效温度读数: {temperatures}")
print(f"平均温度: {sum(temperatures)/len(temperatures):.1f}°C")
## 4. 用户输入迭代器
print("\n4. 用户输入迭代器模拟:")
class MockInput:
"""模拟用户输入"""
def __init__(self, inputs):
self.inputs = inputs
self.index = 0
def get_input(self):
"""模拟获取用户输入"""
if self.index >= len(self.inputs):
return 'quit'
value = self.inputs[self.index]
self.index += 1
print(f" 用户输入: {value}")
return value
## 模拟用户输入序列
user_inputs = ['hello', 'world', 'python', 'programming', 'quit']
mock_input = MockInput(user_inputs)
## 创建迭代器,直到用户输入'quit'
input_iter = iter(mock_input.get_input, 'quit')
print("收集用户输入直到输入'quit':")
collected_inputs = list(input_iter)
print(f"收集到的输入: {collected_inputs}")
## 5. 网络数据接收模拟
print("\n5. 网络数据接收模拟:")
class NetworkReceiver:
"""模拟网络数据接收器"""
def __init__(self, data_packets):
self.packets = data_packets
self.index = 0
def receive_packet(self):
"""接收数据包,返回 None 表示连接断开"""
if self.index >= len(self.packets):
return None
packet = self.packets[self.index]
self.index += 1
# # 模拟网络延迟
time.sleep(0.01)
return packet
## 模拟网络数据包
data_packets = [
{'type': 'data', 'payload': 'packet1'},
{'type': 'data', 'payload': 'packet2'},
{'type': 'heartbeat', 'payload': 'ping'},
{'type': 'data', 'payload': 'packet3'},
{'type': 'data', 'payload': 'packet4'},
]
receiver = NetworkReceiver(data_packets)
## 接收数据直到连接断开(返回 None)
network_iter = iter(receiver.receive_packet, None)
print("接收网络数据包直到连接断开:")
received_packets = []
for packet_num, packet in enumerate(network_iter, 1):
received_packets.append(packet)
print(f" 包{packet_num}: {packet}")
print(f"总共接收: {len(received_packets)}个数据包")
## 6. 高级可调用迭代器模式
print("\n6. 高级可调用迭代器模式:")
class StatefulGenerator:
"""有状态的生成器"""
def __init__(self, initial_value, increment):
self.value = initial_value
self.increment = increment
self.calls = 0
def next_value(self):
"""生成下一个值"""
self.calls += 1
current = self.value
self.value += self.increment
# # 在第 6 次调用时返回特殊值
if self.calls == 6:
return 'STOP'
return current
def get_stats(self):
"""获取统计信息"""
return {
'calls': self.calls,
'current_value': self.value,
'increment': self.increment
}
## 创建状态生成器
generator = StatefulGenerator(10, 3)
## 生成值直到遇到'STOP'
stateful_iter = iter(generator.next_value, 'STOP')
print("有状态生成器:")
generated_values = []
for value in stateful_iter:
generated_values.append(value)
print(f" 生成值: {value}")
stats = generator.get_stats()
print(f"生成的值: {generated_values}")
print(f"生成器统计: {stats}")
## 7. 条件迭代器
print("\n7. 条件迭代器:")
class ConditionalIterator:
"""条件迭代器"""
def __init__(self, data, condition_func):
self.data = data
self.condition = condition_func
self.index = 0
def next_matching(self):
"""返回下一个满足条件的元素"""
while self.index < len(self.data):
item = self.data[self.index]
self.index += 1
if self.condition(item):
return item
return 'END' # 没有更多匹配的元素
## 测试数据
test_numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 15, 18, 20]
## 创建条件迭代器:只返回偶数
even_iterator = ConditionalIterator(test_numbers, lambda x: x % 2 == 0)
even_iter = iter(even_iterator.next_matching, 'END')
print(f"原始数据: {test_numbers}")
print("偶数迭代:")
even_numbers = list(even_iter)
print(f" 偶数: {even_numbers}")
## 创建条件迭代器:只返回大于 10 的数
large_iterator = ConditionalIterator(test_numbers, lambda x: x > 10)
large_iter = iter(large_iterator.next_matching, 'END')
print("大于 10 的数迭代:")
large_numbers = list(large_iter)
print(f" 大于 10: {large_numbers}")
高级迭代器应用
## 高级迭代器应用
print("高级迭代器应用:")
## 1. 迭代器链和组合
print("\n1. 迭代器链和组合:")
class IteratorChain:
"""迭代器链"""
def __init__(self, *iterables):
self.iterables = iterables
self.current_iter = None
self.iter_index = 0
def __iter__(self):
return self
def __next__(self):
while self.iter_index < len(self.iterables):
if self.current_iter is None:
self.current_iter = iter(self.iterables[self.iter_index])
try:
return next(self.current_iter)
except StopIteration:
self.current_iter = None
self.iter_index += 1
raise StopIteration
## 测试迭代器链
chain = IteratorChain([1, 2, 3], "abc", range(5, 8))
chain_iter = iter(chain)
print(f"迭代器链结果: {list(chain_iter)}")
## 使用内置的 itertools.chain 进行比较
import itertools
builtin_chain = itertools.chain([1, 2, 3], "abc", range(5, 8))
print(f"内置 chain 结果: {list(builtin_chain)}")
## 2. 缓存迭代器
print("\n2. 缓存迭代器:")
class CachedIterator:
"""缓存迭代器"""
def __init__(self, iterable, cache_size=10):
self.iterator = iter(iterable)
self.cache = []
self.cache_size = cache_size
self.position = 0
def __iter__(self):
return self
def __next__(self):
# # 如果位置在缓存范围内,直接返回缓存的值
if self.position < len(self.cache):
value = self.cache[self.position]
self.position += 1
return value
# # 否则从原始迭代器获取新值
try:
value = next(self.iterator)
# # 如果缓存未满,添加到缓存
if len(self.cache) < self.cache_size:
self.cache.append(value)
self.position += 1
return value
except StopIteration:
raise StopIteration
def reset(self):
"""重置到缓存开始"""
self.position = 0
def get_cache_info(self):
"""获取缓存信息"""
return {
'cache_size': len(self.cache),
'max_cache_size': self.cache_size,
'current_position': self.position,
'cached_values': self.cache.copy()
}
## 测试缓存迭代器
def expensive_generator():
"""模拟昂贵的计算"""
for i in range(15):
print(f" 计算第{i+1}个值...")
time.sleep(0.01) # 模拟计算时间
yield i ** 2
print("创建缓存迭代器(缓存大小=5):")
cached_iter = CachedIterator(expensive_generator(), cache_size=5)
## 第一次迭代前 5 个元素
print("\n 第一次迭代前 5 个元素:")
first_batch = []
for i in range(5):
value = next(cached_iter)
first_batch.append(value)
print(f" 值{i+1}: {value}")
print(f"缓存信息: {cached_iter.get_cache_info()}")
## 重置并再次迭代前 3 个元素(应该从缓存获取)
print("\n 重置并迭代前 3 个元素(从缓存):")
cached_iter.reset()
second_batch = []
for i in range(3):
value = next(cached_iter)
second_batch.append(value)
print(f" 缓存值{i+1}: {value}")
print(f"第一批: {first_batch}")
print(f"第二批: {second_batch}")
print(f"缓存命中: {first_batch[:3] == second_batch}")
## 3. 分页迭代器
print("\n3. 分页迭代器:")
class PagedIterator:
"""分页迭代器"""
def __init__(self, data, page_size):
self.data = data
self.page_size = page_size
self.current_page = 0
def __iter__(self):
return self
def __next__(self):
start_index = self.current_page * self.page_size
end_index = start_index + self.page_size
if start_index >= len(self.data):
raise StopIteration
page = self.data[start_index:end_index]
self.current_page += 1
return {
'page_number': self.current_page,
'data': page,
'size': len(page),
'is_last': end_index >= len(self.data)
}
def get_page_info(self):
"""获取分页信息"""
total_pages = (len(self.data) + self.page_size - 1) // self.page_size
return {
'total_items': len(self.data),
'page_size': self.page_size,
'total_pages': total_pages,
'current_page': self.current_page
}
## 测试分页迭代器
large_dataset = list(range(1, 24)) # 23 个元素
page_size = 5
print(f"数据集: {large_dataset}")
print(f"页面大小: {page_size}")
paged_iter = PagedIterator(large_dataset, page_size)
print(f"分页信息: {paged_iter.get_page_info()}")
print("\n 分页迭代:")
for page in paged_iter:
print(f" 页面{page['page_number']}: {page['data']} (大小: {page['size']}, 最后页: {page['is_last']})")
## 4. 条件过滤迭代器
print("\n4. 条件过滤迭代器:")
class FilterIterator:
"""条件过滤迭代器"""
def __init__(self, iterable, *conditions):
self.iterator = iter(iterable)
self.conditions = conditions
def __iter__(self):
return self
def __next__(self):
while True:
try:
value = next(self.iterator)
# # 检查所有条件
if all(condition(value) for condition in self.conditions):
return value
except StopIteration:
raise StopIteration
## 定义过滤条件
def is_even(x):
return x % 2 == 0
def is_positive(x):
return x > 0
def is_less_than_20(x):
return x < 20
## 测试数据
test_data = [-5, -2, 0, 1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 20, 22, 25]
print(f"测试数据: {test_data}")
## 单条件过滤
even_filter = FilterIterator(test_data, is_even)
print(f"偶数过滤: {list(even_filter)}")
## 多条件过滤
multi_filter = FilterIterator(test_data, is_even, is_positive, is_less_than_20)
print(f"偶数且正数且小于 20: {list(multi_filter)}")
## 使用 lambda 表达式
lambda_filter = FilterIterator(test_data, lambda x: x % 3 == 0, lambda x: x > 0)
print(f"能被 3 整除且为正数: {list(lambda_filter)}")
## 5. 窗口迭代器
print("\n5. 窗口迭代器:")
class WindowIterator:
"""滑动窗口迭代器"""
def __init__(self, iterable, window_size, step=1):
self.data = list(iterable) # 转换为列表以支持索引
self.window_size = window_size
self.step = step
self.position = 0
def __iter__(self):
return self
def __next__(self):
if self.position + self.window_size > len(self.data):
raise StopIteration
window = self.data[self.position:self.position + self.window_size]
self.position += self.step
return window
## 测试窗口迭代器
sequence = list(range(1, 11)) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print(f"序列: {sequence}")
## 不同窗口配置
window_configs = [
(3, 1, "窗口大小 3,步长 1"),
(4, 2, "窗口大小 4,步长 2"),
(2, 1, "窗口大小 2,步长 1"),
]
for window_size, step, description in window_configs:
window_iter = WindowIterator(sequence, window_size, step)
windows = list(window_iter)
print(f"\n{description}:")
for i, window in enumerate(windows):
print(f" 窗口{i+1}: {window}")
## 6. 统计迭代器
print("\n6. 统计迭代器:")
class StatsIterator:
"""统计迭代器"""
def __init__(self, iterable):
self.iterator = iter(iterable)
self.count = 0
self.sum = 0
self.min_val = None
self.max_val = None
self.values = []
def __iter__(self):
return self
def __next__(self):
try:
value = next(self.iterator)
# # 更新统计信息
self.count += 1
self.sum += value
self.values.append(value)
if self.min_val is None or value < self.min_val:
self.min_val = value
if self.max_val is None or value > self.max_val:
self.max_val = value
return value
except StopIteration:
raise StopIteration
def get_stats(self):
"""获取统计信息"""
if self.count == 0:
return {'count': 0}
return {
'count': self.count,
'sum': self.sum,
'average': self.sum / self.count,
'min': self.min_val,
'max': self.max_val,
'range': self.max_val - self.min_val if self.min_val is not None else 0,
'values': self.values.copy()
}
## 测试统计迭代器
test_values = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
print(f"测试值: {test_values}")
stats_iter = StatsIterator(test_values)
print("\n 迭代过程中的统计:")
processed_values = []
for i, value in enumerate(stats_iter):
processed_values.append(value)
if i % 3 == 2: # 每 3 个值显示一次统计
current_stats = stats_iter.get_stats()
print(f" 处理{i+1}个值后: 平均={current_stats['average']:.2f}, 最小={current_stats['min']}, 最大={current_stats['max']}")
final_stats = stats_iter.get_stats()
print(f"\n 最终统计: {final_stats}")
print(f"处理的值: {processed_values}")
print(f"原始值一致: {test_values == processed_values}")
⚠️ 注意事项
常见陷阱和最佳实践
## iter()函数的常见陷阱和最佳实践
print("iter()函数常见陷阱和最佳实践:")
## 陷阱 1: 迭代器的一次性特性
print("\n 陷阱 1: 迭代器的一次性特性")
data = [1, 2, 3, 4, 5]
iterator = iter(data)
print(f"原始数据: {data}")
print(f"第一次完整迭代: {list(iterator)}")
print(f"第二次迭代(已耗尽): {list(iterator)}")
print(f"再次尝试 next(): ", end="")
try:
next(iterator)
except StopIteration:
print("StopIteration 异常")
print("\n 解决方案:")
print("1. 重新创建迭代器")
iterator2 = iter(data)
print(f" 新迭代器: {list(iterator2)}")
print("2. 使用 itertools.tee()创建多个独立迭代器")
import itertools
iter1, iter2 = itertools.tee(iter(data), 2)
print(f" 迭代器 1: {list(iter1)}")
print(f" 迭代器 2: {list(iter2)}")
## 陷阱 2: 可调用对象迭代器的无限循环风险
print("\n 陷阱 2: 可调用对象迭代器的无限循环风险")
import random
def random_choice():
"""随机返回 1 或 2"""
return random.choice([1, 2])
## 危险:如果哨兵值很难出现,可能导致无限循环
random.seed(42)
print("尝试生成随机数直到遇到 3(可能很久):")
## 安全的做法:添加计数器限制
class SafeCallableIterator:
def __init__(self, callable_func, sentinel, max_iterations=100):
self.callable_func = callable_func
self.sentinel = sentinel
self.max_iterations = max_iterations
self.iterations = 0
def __iter__(self):
return self
def __next__(self):
if self.iterations >= self.max_iterations:
raise StopIteration(f"达到最大迭代次数 {self.max_iterations}")
value = self.callable_func()
self.iterations += 1
if value == self.sentinel:
raise StopIteration
return value
safe_iter = SafeCallableIterator(random_choice, 3, max_iterations=20)
results = list(safe_iter)
print(f"安全迭代结果: {results}")
print(f"迭代次数: {safe_iter.iterations}")
## 陷阱 3: 修改正在迭代的容器
print("\n 陷阱 3: 修改正在迭代的容器")
original_list = [1, 2, 3, 4, 5]
print(f"原始列表: {original_list}")
print("\n 危险操作:迭代时修改列表")
test_list = original_list.copy()
iterator = iter(test_list)
try:
for value in iterator:
print(f" 当前值: {value}")
if value == 3:
test_list.append(6) # 修改列表
print(f" 添加元素后列表: {test_list}")
except Exception as e:
print(f" 错误: {e}")
print("\n 安全做法 1:迭代副本")
test_list = original_list.copy()
for value in test_list.copy(): # 迭代副本
print(f" 当前值: {value}")
if value == 3:
test_list.append(6)
print(f" 修改原列表: {test_list}")
print("\n 安全做法 2:收集修改操作,迭代后执行")
test_list = original_list.copy()
modifications = []
for value in test_list:
print(f" 当前值: {value}")
if value == 3:
modifications.append(('append', 6))
## 执行修改
for operation, arg in modifications:
if operation == 'append':
test_list.append(arg)
print(f" 执行修改后: {test_list}")
## 陷阱 4: 迭代器状态的意外共享
print("\n 陷阱 4: 迭代器状态的意外共享")
class SharedStateIterator:
"""有共享状态问题的迭代器"""
def __init__(self, data):
self.data = data
self.index = 0 # 共享状态
def __iter__(self):
return self # 返回自身,状态共享
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
value = self.data[self.index]
self.index += 1
return value
shared_iter = SharedStateIterator([1, 2, 3, 4, 5])
print("问题:多个迭代器共享状态")
iter1 = iter(shared_iter)
iter2 = iter(shared_iter)
print(f"iter1 is iter2: {iter1 is iter2}")
print(f"从 iter1 获取: {next(iter1)}")
print(f"从 iter2 获取: {next(iter2)}")
print("注意:iter2 继续了 iter1 的位置")
print("\n 解决方案:每次创建新的迭代器实例")
class ProperIterator:
"""正确的迭代器实现"""
def __init__(self, data):
self.data = data
def __iter__(self):
return ProperIteratorInstance(self.data)
class ProperIteratorInstance:
def __init__(self, data):
self.data = data
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
value = self.data[self.index]
self.index += 1
return value
proper_iter = ProperIterator([1, 2, 3, 4, 5])
iter3 = iter(proper_iter)
iter4 = iter(proper_iter)
print(f"iter3 is iter4: {iter3 is iter4}")
print(f"从 iter3 获取: {next(iter3)}")
print(f"从 iter4 获取: {next(iter4)}")
print("正确:每个迭代器有独立状态")
## 陷阱 5: 性能考虑
print("\n 陷阱 5: 性能考虑")
import time
## 比较不同迭代方式的性能
large_data = list(range(100000))
iterations = 1000
print(f"性能测试(数据大小: {len(large_data)}, 迭代次数: {iterations}):")
## 方法 1: 直接 for 循环
start_time = time.time()
for _ in range(iterations):
for item in large_data:
pass
direct_time = time.time() - start_time
## 方法 2: 显式创建迭代器
start_time = time.time()
for _ in range(iterations):
iterator = iter(large_data)
for item in iterator:
pass
explicit_time = time.time() - start_time
## 方法 3: 重复使用迭代器(错误做法)
iterator = iter(large_data)
start_time = time.time()
for _ in range(iterations):
try:
for item in iterator:
pass
except StopIteration:
iterator = iter(large_data) # 重新创建
reuse_time = time.time() - start_time
print(f"直接 for 循环: {direct_time:.6f}秒")
print(f"显式创建迭代器: {explicit_time:.6f}秒")
print(f"重复使用迭代器: {reuse_time:.6f}秒")
print("\n 性能建议:")
print("1. 对于简单迭代,直接使用 for 循环")
print("2. 避免不必要的显式迭代器创建")
print("3. 注意迭代器的一次性特性对性能的影响")
## 陷阱 6: 内存泄漏风险
print("\n 陷阱 6: 内存泄漏风险")
class LeakyIterator:
"""可能导致内存泄漏的迭代器"""
def __init__(self, data):
self.data = data
self.processed = [] # 保存所有处理过的数据
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
value = self.data[self.index]
self.processed.append(value) # 累积数据,可能导致内存泄漏
self.index += 1
return value
def get_memory_usage(self):
return len(self.processed)
class MemoryEfficientIterator:
"""内存高效的迭代器"""
def __init__(self, data):
self.data = data
self.index = 0
self.count = 0 # 只保存计数,不保存数据
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
value = self.data[self.index]
self.count += 1
self.index += 1
return value
def get_memory_usage(self):
return 1 # 只保存计数器
## 比较内存使用
test_data = list(range(1000))
leaky_iter = LeakyIterator(test_data)
efficient_iter = MemoryEfficientIterator(test_data)
## 处理一半数据
for i, (val1, val2) in enumerate(zip(leaky_iter, efficient_iter)):
if i >= 500:
break
print(f"处理 500 个元素后:")
print(f"泄漏迭代器内存使用: {leaky_iter.get_memory_usage()}个元素")
print(f"高效迭代器内存使用: {efficient_iter.get_memory_usage()}个计数器")
print("\n 内存管理建议:")
print("1. 避免在迭代器中累积大量数据")
print("2. 及时释放不需要的引用")
print("3. 考虑使用生成器而不是迭代器类")
print("4. 对于大数据集,使用惰性求值")
## 最佳实践总结
print("\n 最佳实践总结:")
print("1. 理解迭代器的一次性特性")
print("2. 为可调用对象迭代器设置安全限制")
print("3. 避免在迭代过程中修改容器")
print("4. 确保迭代器实例的独立性")
print("5. 根据场景选择合适的迭代方式")
print("6. 注意内存使用,避免不必要的数据累积")
print("7. 使用异常处理来优雅地处理 StopIteration")
print("8. 考虑使用 itertools 模块的高级迭代工具")
🔧 性能优化
iter() 性能优化技巧
## iter() 性能优化技巧
print("iter() 性能优化技巧:")
import time
import sys
## 1. 迭代器 vs 列表推导式性能比较
print("\n1. 迭代器 vs 列表推导式性能比较:")
def performance_test(data_size, iterations):
"""性能测试函数"""
data = range(data_size)
# # 方法 1: 列表推导式(一次性创建所有元素)
start_time = time.time()
for _ in range(iterations):
result = [x * 2 for x in data]
# # 模拟使用前几个元素
for i, val in enumerate(result):
if i >= 10:
break
list_comp_time = time.time() - start_time
# # 方法 2: 生成器表达式(惰性求值)
start_time = time.time()
for _ in range(iterations):
result_gen = (x * 2 for x in data)
result_iter = iter(result_gen)
# # 只获取前几个元素
for i in range(10):
next(result_iter)
generator_time = time.time() - start_time
# # 方法 3: 直接迭代器
start_time = time.time()
for _ in range(iterations):
data_iter = iter(data)
# # 只处理前几个元素
for i in range(10):
val = next(data_iter) * 2
direct_iter_time = time.time() - start_time
return list_comp_time, generator_time, direct_iter_time
## 测试不同数据大小
test_configs = [
(1000, 1000, "小数据集"),
(10000, 100, "中数据集"),
(100000, 10, "大数据集"),
]
for data_size, iterations, description in test_configs:
list_time, gen_time, iter_time = performance_test(data_size, iterations)
print(f"\n{description} (大小: {data_size}, 迭代: {iterations}次):")
print(f" 列表推导式: {list_time:.6f}秒")
print(f" 生成器表达式: {gen_time:.6f}秒")
print(f" 直接迭代器: {iter_time:.6f}秒")
fastest = min(list_time, gen_time, iter_time)
if fastest == gen_time:
print(f" 最快: 生成器表达式 (比列表推导式快 {list_time/gen_time:.1f}x)")
elif fastest == iter_time:
print(f" 最快: 直接迭代器 (比列表推导式快 {list_time/iter_time:.1f}x)")
else:
print(f" 最快: 列表推导式")
## 2. 内存使用优化
print("\n2. 内存使用优化:")
def get_memory_usage():
"""获取当前内存使用(简化版)"""
return sys.getsizeof([])
class MemoryOptimizedIterator:
"""内存优化的迭代器"""
def __init__(self, data_func, size):
self.data_func = data_func # 数据生成函数
self.size = size
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index >= self.size:
raise StopIteration
# # 动态生成数据,不存储
value = self.data_func(self.index)
self.index += 1
return value
## 比较内存使用
data_size = 100000
## 方法 1: 预先生成所有数据
print(f"\n 内存使用比较 (数据大小: {data_size}):")
all_data = [i ** 2 for i in range(data_size)]
all_data_memory = sys.getsizeof(all_data)
print(f"预生成列表内存: {all_data_memory / 1024:.2f} KB")
## 方法 2: 使用内存优化迭代器
optimized_iter = MemoryOptimizedIterator(lambda x: x ** 2, data_size)
iter_memory = sys.getsizeof(optimized_iter)
print(f"优化迭代器内存: {iter_memory} bytes")
print(f"内存节省: {(all_data_memory - iter_memory) / 1024:.2f} KB")
## 验证功能一致性
iter_result = []
for i, value in enumerate(optimized_iter):
iter_result.append(value)
if i >= 9: # 只取前 10 个验证
break
list_result = all_data[:10]
print(f"功能验证 - 前 10 个元素一致: {iter_result == list_result}")
## 3. 批量处理优化
print("\n3. 批量处理优化:")
class BatchIterator:
"""批量处理迭代器"""
def __init__(self, iterable, batch_size):
self.iterator = iter(iterable)
self.batch_size = batch_size
def __iter__(self):
return self
def __next__(self):
batch = []
try:
for _ in range(self.batch_size):
batch.append(next(self.iterator))
except StopIteration:
if batch:
return batch
raise StopIteration
return batch
def process_individually(data):
"""逐个处理数据"""
results = []
for item in data:
# # 模拟处理时间
result = item ** 2 + item
results.append(result)
return results
def process_in_batches(data, batch_size):
"""批量处理数据"""
batch_iter = BatchIterator(data, batch_size)
results = []
for batch in batch_iter:
# # 批量处理(模拟向量化操作)
batch_results = [item ** 2 + item for item in batch]
results.extend(batch_results)
return results
## 性能测试
test_data = list(range(10000))
batch_sizes = [1, 10, 100, 1000]
print(f"批量处理性能测试 (数据大小: {len(test_data)}):")
## 基准:逐个处理
start_time = time.time()
individual_result = process_individually(test_data)
individual_time = time.time() - start_time
print(f"逐个处理: {individual_time:.6f}秒")
## 批量处理测试
for batch_size in batch_sizes:
start_time = time.time()
batch_result = process_in_batches(test_data, batch_size)
batch_time = time.time() - start_time
speedup = individual_time / batch_time if batch_time > 0 else float('inf')
print(f"批量大小{batch_size:4d}: {batch_time:.6f}秒 (加速: {speedup:.2f}x)")
# # 验证结果一致性
if individual_result == batch_result:
print(f" 结果一致 ✓")
else:
print(f" 结果不一致 ✗")
## 4. 缓存和预取优化
print("\n4. 缓存和预取优化:")
class PrefetchIterator:
"""预取迭代器"""
def __init__(self, iterable, prefetch_size=10):
self.iterator = iter(iterable)
self.prefetch_size = prefetch_size
self.buffer = []
self.exhausted = False
self._prefetch()
def _prefetch(self):
"""预取数据到缓冲区"""
while len(self.buffer) < self.prefetch_size and not self.exhausted:
try:
self.buffer.append(next(self.iterator))
except StopIteration:
self.exhausted = True
break
def __iter__(self):
return self
def __next__(self):
if not self.buffer and self.exhausted:
raise StopIteration
if not self.buffer:
self._prefetch()
if self.buffer:
value = self.buffer.pop(0)
self._prefetch() # 保持缓冲区满
return value
else:
raise StopIteration
def slow_data_generator(size):
"""模拟慢速数据生成"""
for i in range(size):
time.sleep(0.001) # 模拟 I/O 延迟
yield i ** 2
## 比较预取效果
data_size = 100
print(f"预取优化测试 (数据大小: {data_size}):")
## 普通迭代器
start_time = time.time()
normal_iter = slow_data_generator(data_size)
normal_results = list(normal_iter)
normal_time = time.time() - start_time
## 预取迭代器
start_time = time.time()
prefetch_iter = PrefetchIterator(slow_data_generator(data_size), prefetch_size=5)
prefetch_results = list(prefetch_iter)
prefetch_time = time.time() - start_time
print(f"普通迭代器: {normal_time:.6f}秒")
print(f"预取迭代器: {prefetch_time:.6f}秒")
print(f"性能提升: {normal_time/prefetch_time:.2f}x")
print(f"结果一致: {normal_results == prefetch_results}")
print("\n 性能优化总结:")
print("1. 对于大数据集的部分处理,使用生成器表达式")
print("2. 使用内存优化迭代器避免预先加载所有数据")
print("3. 批量处理可以提高某些操作的效率")
print("4. 预取机制可以隐藏 I/O 延迟")
print("5. 根据具体场景选择合适的优化策略")
print("6. 平衡内存使用和计算性能")
📚 相关函数和模块
相关内置函数
next()- 获取迭代器下一个元素enumerate()- 枚举迭代器zip()- 并行迭代多个序列map()- 映射函数到迭代器filter()- 过滤迭代器元素reversed()- 反向迭代器range()- 范围迭代器list()- 列表构造函数tuple()- 元组构造函数
相关标准库模块
itertools- 迭代工具(chain, cycle, repeat, islice 等)functools- 函数工具(reduce 等)operator- 运算符函数collections- 集合类型(deque 等)typing- 类型提示(Iterator, Iterable 等)
相关第三方库
more-itertools- 扩展的迭代工具numpy- 数值计算(nditer 等)pandas- 数据分析(iterrows, itertuples 等)asyncio- 异步迭代器
🔗 扩展阅读
🏷️ 标签
迭代器 生成器 循环 数据流 惰性求值 内存优化 函数式编程
最后更新: 2024-01-15
作者: Python 文档工程师
版本: 1.0
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。