open() - 文件打开函数
打开文件并返回文件对象的内置函数
open() - 文件打开函数
📝 概述
open() 是 Python 中用于文件操作的核心内置函数,它打开一个文件并返回对应的文件对象。通过这个函数,我们可以读取、写入、修改文件内容,支持文本和二进制两种模式,是 Python 文件 I/O 操作的基础。
🎯 学习目标
- 掌握 open()函数的完整语法和所有参数
- 理解不同文件模式的用途和区别
- 学会处理文本和二进制文件
- 掌握文件编码和错误处理
- 学会使用上下文管理器安全操作文件
📋 前置知识
- Python 基本语法
- 字符串操作
- 异常处理基础
- 上下文管理器(with 语句)
- 编码概念(UTF-8、ASCII 等)
🔍 详细内容
基本概念
open() 函数打开指定路径的文件,返回一个文件对象,该对象提供了读取、写入文件内容的方法。文件对象支持迭代器协议,可以逐行读取文件内容。
语法格式
open(file, mode='r', buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None)
参数说明
| 参数名 | 类型 | 必需 | 默认值 | 说明 |
|---|---|---|---|---|
| file | str/PathLike | 是 | 无 | 文件路径或文件描述符 |
| mode | str | 否 | ‘r’ | 文件打开模式 |
| buffering | int | 否 | -1 | 缓冲策略 |
| encoding | str | 否 | None | 文本编码方式 |
| errors | str | 否 | None | 编码错误处理方式 |
| newline | str | 否 | None | 换行符处理方式 |
| closefd | bool | 否 | True | 是否关闭文件描述符 |
| opener | callable | 否 | None | 自定义文件打开器 |
文件模式详解
| 模式 | 说明 | 文件存在 | 文件不存在 | 读取 | 写入 | 指针位置 |
|---|---|---|---|---|---|---|
| ‘r’ | 只读(默认) | 打开 | 报错 | ✓ | ✗ | 开头 |
| ‘w’ | 只写 | 清空后打开 | 创建 | ✗ | ✓ | 开头 |
| ‘a’ | 追加写入 | 打开 | 创建 | ✗ | ✓ | 末尾 |
| ‘x’ | 独占创建 | 报错 | 创建 | ✗ | ✓ | 开头 |
| ‘r+’ | 读写 | 打开 | 报错 | ✓ | ✓ | 开头 |
| ‘w+’ | 读写 | 清空后打开 | 创建 | ✓ | ✓ | 开头 |
| ‘a+’ | 读写追加 | 打开 | 创建 | ✓ | ✓ | 末尾 |
模式修饰符
| 修饰符 | 说明 |
|---|---|
| ‘b’ | 二进制模式(如 ‘rb’, ‘wb’) |
| ‘t’ | 文本模式(默认,如 ‘rt’, ‘wt’) |
返回值
- 类型: TextIOWrapper(文本模式)或 BufferedReader/BufferedWriter(二进制模式)
- 说明: 文件对象,支持读写操作和上下文管理器协议
💡 实际应用
基础文件操作
## 基础文件读取
print("基础文件操作示例:")
## 创建示例文件
sample_content = """第一行内容
第二行内容
第三行内容
包含中文的行
数字行: 123456
特殊字符: !@#$%^&*()
空行在下面:
最后一行"""
## 写入示例文件
with open("sample.txt", "w", encoding="utf-8") as f:
f.write(sample_content)
print("已创建示例文件 sample.txt")
## 1. 读取整个文件
print("\n1. 读取整个文件:")
with open("sample.txt", "r", encoding="utf-8") as f:
content = f.read()
print(f"文件内容:\n{content}")
print(f"文件大小: {len(content)} 字符")
## 2. 逐行读取
print("\n2. 逐行读取:")
with open("sample.txt", "r", encoding="utf-8") as f:
line_number = 1
for line in f:
print(f"第{line_number}行: {line.rstrip()}")
line_number += 1
## 3. 读取所有行到列表
print("\n3. 读取所有行到列表:")
with open("sample.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
print(f"总共 {len(lines)} 行")
for i, line in enumerate(lines, 1):
print(f"行{i}: {repr(line)}")
## 4. 读取指定字符数
print("\n4. 读取指定字符数:")
with open("sample.txt", "r", encoding="utf-8") as f:
first_20_chars = f.read(20)
print(f"前 20 个字符: {repr(first_20_chars)}")
next_10_chars = f.read(10)
print(f"接下来 10 个字符: {repr(next_10_chars)}")
## 5. 读取一行
print("\n5. 读取一行:")
with open("sample.txt", "r", encoding="utf-8") as f:
first_line = f.readline()
second_line = f.readline()
print(f"第一行: {repr(first_line)}")
print(f"第二行: {repr(second_line)}")
## 6. 文件指针操作
print("\n6. 文件指针操作:")
with open("sample.txt", "r", encoding="utf-8") as f:
print(f"初始位置: {f.tell()}")
data = f.read(10)
print(f"读取 10 字符后位置: {f.tell()}")
print(f"读取的内容: {repr(data)}")
f.seek(0) # 回到文件开头
print(f"seek(0)后位置: {f.tell()}")
f.seek(5) # 移动到第 5 个字符
print(f"seek(5)后位置: {f.tell()}")
data = f.read(5)
print(f"从位置 5 读取 5 字符: {repr(data)}")
## 7. 检查文件状态
print("\n7. 文件对象属性:")
with open("sample.txt", "r", encoding="utf-8") as f:
print(f"文件名: {f.name}")
print(f"文件模式: {f.mode}")
print(f"编码: {f.encoding}")
print(f"是否关闭: {f.closed}")
print(f"是否可读: {f.readable()}")
print(f"是否可写: {f.writable()}")
print(f"是否可定位: {f.seekable()}")
print(f"文件关闭后是否关闭: {f.closed}")
## 8. 不同读取方式的性能对比
print("\n8. 读取方式性能对比:")
import time
## 创建较大的测试文件
large_content = "\n".join([f"这是第{i}行内容" for i in range(1000)])
with open("large_sample.txt", "w", encoding="utf-8") as f:
f.write(large_content)
## 方法 1: read()全部读取
start_time = time.time()
with open("large_sample.txt", "r", encoding="utf-8") as f:
content = f.read()
time_read_all = time.time() - start_time
## 方法 2: readlines()读取所有行
start_time = time.time()
with open("large_sample.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
time_readlines = time.time() - start_time
## 方法 3: 逐行迭代
start_time = time.time()
with open("large_sample.txt", "r", encoding="utf-8") as f:
for line in f:
pass # 只是迭代,不做处理
time_iterate = time.time() - start_time
print(f"read()全部读取: {time_read_all:.6f}秒")
print(f"readlines()读取: {time_readlines:.6f}秒")
print(f"逐行迭代: {time_iterate:.6f}秒")
## 清理测试文件
import os
os.remove("large_sample.txt")
文件写入操作
## 文件写入操作
print("文件写入操作示例:")
## 1. 基本写入(覆盖模式)
print("\n1. 基本写入(覆盖模式):")
with open("write_test.txt", "w", encoding="utf-8") as f:
f.write("这是第一行\n")
f.write("这是第二行\n")
f.write("这是第三行")
with open("write_test.txt", "r", encoding="utf-8") as f:
print(f"写入结果:\n{f.read()}")
## 2. 追加写入
print("\n2. 追加写入:")
with open("write_test.txt", "a", encoding="utf-8") as f:
f.write("\n 这是追加的第四行\n")
f.write("这是追加的第五行")
with open("write_test.txt", "r", encoding="utf-8") as f:
print(f"追加后结果:\n{f.read()}")
## 3. 写入多行
print("\n3. 写入多行:")
lines_to_write = [
"多行写入示例\n",
"第二行内容\n",
"第三行内容\n",
"最后一行"
]
with open("multiline_test.txt", "w", encoding="utf-8") as f:
f.writelines(lines_to_write)
with open("multiline_test.txt", "r", encoding="utf-8") as f:
print(f"多行写入结果:\n{f.read()}")
## 4. 格式化写入
print("\n4. 格式化写入:")
data = [
{"name": "张三", "age": 25, "score": 95.5},
{"name": "李四", "age": 30, "score": 87.2},
{"name": "王五", "age": 28, "score": 92.8}
]
with open("formatted_data.txt", "w", encoding="utf-8") as f:
f.write("学生成绩表\n")
f.write("=" * 30 + "\n")
f.write(f"{'姓名':<10}{'年龄':<10}{'分数':<10}\n")
f.write("-" * 30 + "\n")
for student in data:
f.write(f"{student['name']:<10}{student['age']:<10}{student['score']:<10.1f}\n")
with open("formatted_data.txt", "r", encoding="utf-8") as f:
print(f"格式化写入结果:\n{f.read()}")
## 5. CSV 格式写入
print("\n5. CSV 格式写入:")
csv_data = [
["姓名", "年龄", "城市", "职业"],
["张三", "25", "北京", "工程师"],
["李四", "30", "上海", "设计师"],
["王五", "28", "广州", "产品经理"]
]
with open("data.csv", "w", encoding="utf-8") as f:
for row in csv_data:
f.write(",".join(map(str, row)) + "\n")
with open("data.csv", "r", encoding="utf-8") as f:
print(f"CSV 写入结果:\n{f.read()}")
## 6. JSON 格式写入
print("\n6. JSON 格式写入:")
import json
json_data = {
"users": [
{"id": 1, "name": "张三", "email": "zhangsan@example.com"},
{"id": 2, "name": "李四", "email": "lisi@example.com"}
],
"settings": {
"theme": "dark",
"language": "zh-CN"
}
}
with open("data.json", "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
with open("data.json", "r", encoding="utf-8") as f:
print(f"JSON 写入结果:\n{f.read()}")
## 7. 实时写入和刷新
print("\n7. 实时写入和刷新:")
with open("realtime_log.txt", "w", encoding="utf-8") as f:
for i in range(5):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] 处理第 {i+1} 项任务\n"
f.write(log_entry)
f.flush() # 强制刷新到磁盘
print(f"已写入: {log_entry.strip()}")
time.sleep(0.5)
with open("realtime_log.txt", "r", encoding="utf-8") as f:
print(f"\n 实时日志结果:\n{f.read()}")
## 8. 读写模式(r+, w+, a+)
print("\n8. 读写模式示例:")
## r+ 模式:读写,文件必须存在
with open("write_test.txt", "r+", encoding="utf-8") as f:
content = f.read()
print(f"r+ 模式读取内容长度: {len(content)} 字符")
f.seek(0, 2) # 移动到文件末尾
f.write("\nr+ 模式追加的内容")
f.seek(0) # 回到开头
updated_content = f.read()
print(f"r+ 模式更新后长度: {len(updated_content)} 字符")
## w+ 模式:读写,清空文件内容
with open("w_plus_test.txt", "w+", encoding="utf-8") as f:
f.write("w+ 模式写入的内容\n")
f.write("第二行内容")
f.seek(0) # 回到开头读取
content = f.read()
print(f"w+ 模式内容: {repr(content)}")
## a+ 模式:读写,追加模式
with open("a_plus_test.txt", "w", encoding="utf-8") as f:
f.write("初始内容\n")
with open("a_plus_test.txt", "a+", encoding="utf-8") as f:
f.write("a+ 模式追加内容\n")
f.seek(0) # 回到开头读取
content = f.read()
print(f"a+ 模式内容: {repr(content)}")
## 9. 独占创建模式(x)
print("\n9. 独占创建模式:")
try:
with open("exclusive_test.txt", "x", encoding="utf-8") as f:
f.write("独占创建的文件内容")
print("独占创建成功")
except FileExistsError:
print("文件已存在,独占创建失败")
## 再次尝试创建同名文件
try:
with open("exclusive_test.txt", "x", encoding="utf-8") as f:
f.write("这不会被写入")
except FileExistsError:
print("文件已存在,无法独占创建")
## 清理测试文件
test_files = [
"write_test.txt", "multiline_test.txt", "formatted_data.txt",
"data.csv", "data.json", "realtime_log.txt", "w_plus_test.txt",
"a_plus_test.txt", "exclusive_test.txt"
]
for filename in test_files:
try:
os.remove(filename)
except FileNotFoundError:
pass
print("\n 已清理测试文件")
二进制文件操作
## 二进制文件操作
print("二进制文件操作示例:")
## 1. 创建二进制数据
print("\n1. 创建和写入二进制数据:")
binary_data = bytes([0x48, 0x65, 0x6C, 0x6C, 0x6F]) # "Hello" 的 ASCII 码
print(f"二进制数据: {binary_data}")
print(f"转换为字符串: {binary_data.decode('ascii')}")
## 写入二进制文件
with open("binary_test.bin", "wb") as f:
f.write(binary_data)
f.write(b"\x20World!") # 添加 " World!"
print("已写入二进制文件")
## 2. 读取二进制文件
print("\n2. 读取二进制文件:")
with open("binary_test.bin", "rb") as f:
data = f.read()
print(f"读取的二进制数据: {data}")
print(f"转换为字符串: {data.decode('ascii')}")
print(f"数据长度: {len(data)} 字节")
## 3. 分块读取二进制文件
print("\n3. 分块读取二进制文件:")
with open("binary_test.bin", "rb") as f:
chunk_size = 3
chunk_num = 1
while True:
chunk = f.read(chunk_size)
if not chunk:
break
print(f"块 {chunk_num}: {chunk} -> {chunk.decode('ascii', errors='ignore')}")
chunk_num += 1
## 4. 处理图像文件(模拟)
print("\n4. 处理图像文件头信息:")
## 创建模拟的图像文件头
image_header = b'\x89PNG\r\n\x1a\n' # PNG 文件头
image_data = b'\x00' * 100 # 模拟图像数据
with open("image_test.png", "wb") as f:
f.write(image_header)
f.write(image_data)
## 读取并检查文件头
with open("image_test.png", "rb") as f:
header = f.read(8)
print(f"文件头: {header}")
if header.startswith(b'\x89PNG'):
print("这是一个 PNG 文件")
else:
print("这不是 PNG 文件")
# # 获取文件大小
f.seek(0, 2) # 移动到文件末尾
file_size = f.tell()
print(f"文件大小: {file_size} 字节")
## 5. 数字数据的二进制存储
print("\n5. 数字数据的二进制存储:")
import struct
## 打包数字数据
numbers = [1, 2, 3, 4, 5]
packed_data = struct.pack('5i', *numbers) # 5 个整数
print(f"打包的数据: {packed_data}")
print(f"数据长度: {len(packed_data)} 字节")
## 写入二进制文件
with open("numbers.bin", "wb") as f:
f.write(packed_data)
## 读取并解包
with open("numbers.bin", "rb") as f:
data = f.read()
unpacked = struct.unpack('5i', data)
print(f"解包的数据: {unpacked}")
## 6. 混合数据类型的二进制存储
print("\n6. 混合数据类型存储:")
## 存储字符串长度 + 字符串 + 整数 + 浮点数
text = "Hello, Binary!"
text_bytes = text.encode('utf-8')
text_length = len(text_bytes)
integer_value = 42
float_value = 3.14159
with open("mixed_data.bin", "wb") as f:
# # 写入字符串长度(4 字节整数)
f.write(struct.pack('I', text_length))
# # 写入字符串
f.write(text_bytes)
# # 写入整数(4 字节)
f.write(struct.pack('i', integer_value))
# # 写入浮点数(8 字节双精度)
f.write(struct.pack('d', float_value))
## 读取混合数据
with open("mixed_data.bin", "rb") as f:
# # 读取字符串长度
text_length = struct.unpack('I', f.read(4))[0]
print(f"字符串长度: {text_length}")
# # 读取字符串
text_data = f.read(text_length)
decoded_text = text_data.decode('utf-8')
print(f"字符串: {decoded_text}")
# # 读取整数
integer_data = struct.unpack('i', f.read(4))[0]
print(f"整数: {integer_data}")
# # 读取浮点数
float_data = struct.unpack('d', f.read(8))[0]
print(f"浮点数: {float_data}")
## 7. 文件复制(二进制方式)
print("\n7. 二进制文件复制:")
def copy_file_binary(source, destination, chunk_size=8192):
"""二进制方式复制文件"""
with open(source, "rb") as src, open(destination, "wb") as dst:
while True:
chunk = src.read(chunk_size)
if not chunk:
break
dst.write(chunk)
print(f"文件复制完成: {source} -> {destination}")
## 复制之前创建的二进制文件
copy_file_binary("binary_test.bin", "binary_test_copy.bin")
## 验证复制结果
with open("binary_test.bin", "rb") as f1, open("binary_test_copy.bin", "rb") as f2:
original = f1.read()
copied = f2.read()
print(f"复制验证: {original == copied}")
## 8. 大文件处理
print("\n8. 大文件处理示例:")
## 创建一个较大的二进制文件
large_data = b'A' * 10000 # 10KB 的数据
with open("large_binary.bin", "wb") as f:
f.write(large_data)
## 分块处理大文件
def process_large_binary_file(filename, chunk_size=1024):
"""分块处理大二进制文件"""
total_size = 0
chunk_count = 0
with open(filename, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunk_count += 1
total_size += len(chunk)
# # 这里可以对 chunk 进行处理
# # 例如:计算校验和、压缩、加密等
print(f"处理完成: {chunk_count} 个块, 总大小: {total_size} 字节")
return total_size
processed_size = process_large_binary_file("large_binary.bin")
## 清理二进制测试文件
binary_test_files = [
"binary_test.bin", "image_test.png", "numbers.bin",
"mixed_data.bin", "binary_test_copy.bin", "large_binary.bin"
]
for filename in binary_test_files:
try:
os.remove(filename)
except FileNotFoundError:
pass
print("\n 已清理二进制测试文件")
编码和错误处理
## 编码和错误处理
print("编码和错误处理示例:")
## 1. 不同编码的文件处理
print("\n1. 不同编码的文件处理:")
## 创建包含各种字符的测试内容
test_content = """英文: Hello World
中文: 你好世界
日文: こんにちは世界
韩文: 안녕하세요 세계
俄文: Привет мир
阿拉伯文: مرحبا بالعالم
表情符号: 😀😊🎉
特殊符号: ©®™€£¥
数学符号: ∑∏∫√∞
"""
## 使用不同编码保存文件
encodings = ['utf-8', 'utf-16', 'gbk', 'ascii']
for encoding in encodings:
filename = f"test_{encoding.replace('-', '_')}.txt"
try:
with open(filename, "w", encoding=encoding) as f:
f.write(test_content)
print(f"成功使用 {encoding} 编码保存文件")
# # 读取文件并显示大小
with open(filename, "r", encoding=encoding) as f:
content = f.read()
print(f" 文件大小: {len(content)} 字符")
except UnicodeEncodeError as e:
print(f"使用 {encoding} 编码失败: {e}")
## 2. 编码检测和转换
print("\n2. 编码检测和转换:")
## 尝试用错误的编码读取文件
print("尝试用不同编码读取 UTF-8 文件:")
with open("test_utf_8.txt", "rb") as f:
raw_data = f.read()
print(f"原始字节数据长度: {len(raw_data)} 字节")
## 尝试不同编码解码
test_encodings = ['utf-8', 'gbk', 'latin-1', 'ascii']
for encoding in test_encodings:
try:
decoded = raw_data.decode(encoding)
print(f"使用 {encoding} 解码成功,长度: {len(decoded)} 字符")
print(f" 前 50 字符: {decoded[:50]}...")
except UnicodeDecodeError as e:
print(f"使用 {encoding} 解码失败: {e}")
## 3. 错误处理策略
print("\n3. 错误处理策略:")
## 创建包含无法编码字符的内容
problematic_content = "正常文字 \udcff 无效字符 \udcfe 更多文字"
error_strategies = ['strict', 'ignore', 'replace', 'xmlcharrefreplace', 'backslashreplace']
for strategy in error_strategies:
filename = f"error_test_{strategy}.txt"
try:
with open(filename, "w", encoding="ascii", errors=strategy) as f:
f.write("ASCII can handle this")
# # 尝试写入非 ASCII 字符
f.write("\n 中文字符测试")
print(f"错误策略 {strategy}: 写入成功")
except UnicodeEncodeError as e:
print(f"错误策略 {strategy}: 写入失败 - {e}")
## 读取时的错误处理
print("\n 读取时的错误处理:")
## 创建一个包含混合编码的文件
with open("mixed_encoding.txt", "wb") as f:
f.write("Hello ".encode('utf-8'))
f.write("世界".encode('gbk')) # 故意使用不同编码
f.write(" World".encode('utf-8'))
## 尝试用不同错误策略读取
for strategy in ['strict', 'ignore', 'replace']:
try:
with open("mixed_encoding.txt", "r", encoding="utf-8", errors=strategy) as f:
content = f.read()
print(f"错误策略 {strategy}: {repr(content)}")
except UnicodeDecodeError as e:
print(f"错误策略 {strategy}: 读取失败 - {e}")
## 4. 自动编码检测(简单版本)
print("\n4. 简单编码检测:")
def detect_encoding_simple(filename):
"""简单的编码检测"""
encodings_to_try = ['utf-8', 'gbk', 'utf-16', 'latin-1']
for encoding in encodings_to_try:
try:
with open(filename, "r", encoding=encoding) as f:
content = f.read()
return encoding, content
except UnicodeDecodeError:
continue
return None, None
## 测试编码检测
test_files = ['test_utf_8.txt', 'test_gbk.txt']
for filename in test_files:
if os.path.exists(filename):
detected_encoding, content = detect_encoding_simple(filename)
if detected_encoding:
print(f"文件 {filename} 检测到编码: {detected_encoding}")
print(f" 内容预览: {content[:50]}...")
else:
print(f"文件 {filename} 无法检测编码")
## 5. 编码转换工具
print("\n5. 编码转换工具:")
def convert_file_encoding(source_file, target_file, source_encoding, target_encoding):
"""转换文件编码"""
try:
with open(source_file, "r", encoding=source_encoding) as src:
content = src.read()
with open(target_file, "w", encoding=target_encoding) as dst:
dst.write(content)
print(f"编码转换成功: {source_encoding} -> {target_encoding}")
return True
except Exception as e:
print(f"编码转换失败: {e}")
return False
## 转换 UTF-8 文件到 GBK
if os.path.exists("test_utf_8.txt"):
success = convert_file_encoding(
"test_utf_8.txt", "converted_gbk.txt", "utf-8", "gbk"
)
if success:
# # 验证转换结果
with open("converted_gbk.txt", "r", encoding="gbk") as f:
converted_content = f.read()
print(f"转换后内容长度: {len(converted_content)} 字符")
## 6. BOM(字节顺序标记)处理
print("\n6. BOM 处理:")
## 创建带 BOM 的 UTF-8 文件
with open("bom_test.txt", "wb") as f:
f.write(b'\xef\xbb\xbf') # UTF-8 BOM
f.write("带 BOM 的 UTF-8 文件内容".encode('utf-8'))
## 读取带 BOM 的文件
with open("bom_test.txt", "r", encoding="utf-8-sig") as f:
content_with_bom_handling = f.read()
print(f"使用 utf-8-sig 读取: {repr(content_with_bom_handling)}")
with open("bom_test.txt", "r", encoding="utf-8") as f:
content_without_bom_handling = f.read()
print(f"使用 utf-8 读取: {repr(content_without_bom_handling)}")
## 7. 文件编码信息获取
print("\n7. 文件编码信息:")
def analyze_file_encoding(filename):
"""分析文件编码信息"""
try:
# # 读取文件的前几个字节检查 BOM
with open(filename, "rb") as f:
first_bytes = f.read(4)
bom_info = ""
if first_bytes.startswith(b'\xef\xbb\xbf'):
bom_info = "UTF-8 BOM"
elif first_bytes.startswith(b'\xff\xfe'):
bom_info = "UTF-16 LE BOM"
elif first_bytes.startswith(b'\xfe\xff'):
bom_info = "UTF-16 BE BOM"
else:
bom_info = "无 BOM"
print(f"文件 {filename}:")
print(f" 前 4 字节: {first_bytes}")
print(f" BOM 信息: {bom_info}")
# # 获取文件大小
file_size = os.path.getsize(filename)
print(f" 文件大小: {file_size} 字节")
except Exception as e:
print(f"分析文件 {filename} 失败: {e}")
## 分析不同编码的文件
for filename in ['test_utf_8.txt', 'bom_test.txt']:
if os.path.exists(filename):
analyze_file_encoding(filename)
## 清理编码测试文件
encoding_test_files = [
'test_utf_8.txt', 'test_utf_16.txt', 'test_gbk.txt',
'error_test_strict.txt', 'error_test_ignore.txt', 'error_test_replace.txt',
'error_test_xmlcharrefreplace.txt', 'error_test_backslashreplace.txt',
'mixed_encoding.txt', 'converted_gbk.txt', 'bom_test.txt'
]
for filename in encoding_test_files:
try:
os.remove(filename)
except FileNotFoundError:
pass
print("\n 已清理编码测试文件")
高级文件操作
## 高级文件操作
print("高级文件操作示例:")
## 1. 文件锁定(简单实现)
print("\n1. 文件锁定机制:")
import fcntl
import time
import threading
def write_with_lock(filename, content, delay=0):
"""带锁的文件写入"""
try:
with open(filename, "a", encoding="utf-8") as f:
# # 在 Unix 系统上使用文件锁
if hasattr(fcntl, 'flock'):
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
f.write(f"[{threading.current_thread().name}] {content}\n")
if delay:
time.sleep(delay)
# # 锁会在文件关闭时自动释放
print(f"线程 {threading.current_thread().name} 写入完成")
except Exception as e:
print(f"写入失败: {e}")
## 创建多个线程同时写入文件
lock_test_file = "lock_test.txt"
if os.path.exists(lock_test_file):
os.remove(lock_test_file)
threads = []
for i in range(3):
thread = threading.Thread(
target=write_with_lock,
args=(lock_test_file, f"消息来自线程{i}", 0.5),
name=f"Thread-{i}"
)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
## 检查写入结果
if os.path.exists(lock_test_file):
with open(lock_test_file, "r", encoding="utf-8") as f:
print(f"文件锁定测试结果:\n{f.read()}")
os.remove(lock_test_file)
## 2. 临时文件操作
print("\n2. 临时文件操作:")
import tempfile
## 创建临时文件
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", delete=False) as temp_file:
temp_filename = temp_file.name
temp_file.write("这是临时文件的内容\n")
temp_file.write("第二行内容")
print(f"创建临时文件: {temp_filename}")
## 读取临时文件
with open(temp_filename, "r", encoding="utf-8") as f:
temp_content = f.read()
print(f"临时文件内容: {temp_content}")
## 手动删除临时文件
os.remove(temp_filename)
print("临时文件已删除")
## 使用上下文管理器自动删除临时文件
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as temp_file:
temp_file.write("自动删除的临时文件")
temp_file.seek(0)
content = temp_file.read()
print(f"自动删除临时文件内容: {content}")
print(f"临时文件路径: {temp_file.name}")
## 文件在这里自动删除
## 3. 内存映射文件
print("\n3. 内存映射文件:")
import mmap
## 创建测试文件
test_data = b"Hello, Memory Mapped File! " * 100
with open("mmap_test.txt", "wb") as f:
f.write(test_data)
## 使用内存映射读取文件
with open("mmap_test.txt", "r+b") as f:
with mmap.mmap(f.fileno(), 0) as mm:
print(f"内存映射文件大小: {len(mm)} 字节")
print(f"前 50 字节: {mm[:50]}")
# # 搜索内容
pos = mm.find(b"Memory")
if pos != -1:
print(f"找到 'Memory' 在位置: {pos}")
# # 修改内容
mm[0:5] = b"Hi!!!" # 替换前 5 个字节
# # 读取修改后的内容
mm.seek(0)
modified_content = mm.read(50)
print(f"修改后前 50 字节: {modified_content}")
## 验证修改
with open("mmap_test.txt", "rb") as f:
content = f.read(50)
print(f"文件修改验证: {content}")
os.remove("mmap_test.txt")
## 4. 文件监控(简单实现)
print("\n4. 文件监控:")
def monitor_file_changes(filename, check_interval=1, duration=5):
"""监控文件变化"""
if not os.path.exists(filename):
print(f"文件 {filename} 不存在")
return
last_modified = os.path.getmtime(filename)
last_size = os.path.getsize(filename)
print(f"开始监控文件: {filename}")
print(f"初始修改时间: {time.ctime(last_modified)}")
print(f"初始文件大小: {last_size} 字节")
start_time = time.time()
while time.time() - start_time < duration:
current_modified = os.path.getmtime(filename)
current_size = os.path.getsize(filename)
if current_modified != last_modified:
print(f"文件已修改: {time.ctime(current_modified)}")
last_modified = current_modified
if current_size != last_size:
print(f"文件大小变化: {last_size} -> {current_size} 字节")
last_size = current_size
time.sleep(check_interval)
print("监控结束")
## 创建测试文件并启动监控
monitor_test_file = "monitor_test.txt"
with open(monitor_test_file, "w", encoding="utf-8") as f:
f.write("初始内容")
## 在另一个线程中修改文件
def modify_file():
time.sleep(2)
with open(monitor_test_file, "a", encoding="utf-8") as f:
f.write("\n 添加的内容")
time.sleep(2)
with open(monitor_test_file, "a", encoding="utf-8") as f:
f.write("\n 更多内容")
modify_thread = threading.Thread(target=modify_file)
modify_thread.start()
## 监控文件(这里简化为短时间)
monitor_file_changes(monitor_test_file, check_interval=0.5, duration=3)
modify_thread.join()
os.remove(monitor_test_file)
## 5. 文件压缩和解压
print("\n5. 文件压缩和解压:")
import gzip
import zipfile
## 创建测试数据
test_content = "这是要压缩的测试内容。" * 100
## gzip 压缩
with open("test_data.txt", "w", encoding="utf-8") as f:
f.write(test_content)
original_size = os.path.getsize("test_data.txt")
## 使用 gzip 压缩
with open("test_data.txt", "rb") as f_in:
with gzip.open("test_data.txt.gz", "wb") as f_out:
f_out.write(f_in.read())
compressed_size = os.path.getsize("test_data.txt.gz")
compression_ratio = (1 - compressed_size / original_size) * 100
print(f"原始文件大小: {original_size} 字节")
print(f"压缩文件大小: {compressed_size} 字节")
print(f"压缩率: {compression_ratio:.2f}%")
## gzip 解压
with gzip.open("test_data.txt.gz", "rb") as f_in:
with open("test_data_decompressed.txt", "wb") as f_out:
f_out.write(f_in.read())
## 验证解压结果
with open("test_data_decompressed.txt", "r", encoding="utf-8") as f:
decompressed_content = f.read()
print(f"解压验证: {decompressed_content == test_content}")
## ZIP 压缩多个文件
with zipfile.ZipFile("test_archive.zip", "w") as zip_file:
zip_file.write("test_data.txt")
zip_file.write("test_data_decompressed.txt")
print(f"ZIP 文件大小: {os.path.getsize('test_archive.zip')} 字节")
## 列出 ZIP 文件内容
with zipfile.ZipFile("test_archive.zip", "r") as zip_file:
print("ZIP 文件内容:")
for info in zip_file.infolist():
print(f" {info.filename}: {info.file_size} 字节")
## 清理压缩测试文件
compression_test_files = [
"test_data.txt", "test_data.txt.gz",
"test_data_decompressed.txt", "test_archive.zip"
]
for filename in compression_test_files:
try:
os.remove(filename)
except FileNotFoundError:
pass
## 6. 文件备份和版本控制
print("\n6. 文件备份和版本控制:")
import shutil
from datetime import datetime
def backup_file(filename, backup_dir="backups"):
"""创建文件备份"""
if not os.path.exists(filename):
print(f"文件 {filename} 不存在")
return None
# # 创建备份目录
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
# # 生成备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = os.path.splitext(os.path.basename(filename))[0]
extension = os.path.splitext(filename)[1]
backup_filename = f"{base_name}_{timestamp}{extension}"
backup_path = os.path.join(backup_dir, backup_filename)
# # 复制文件
shutil.copy2(filename, backup_path)
print(f"备份创建: {filename} -> {backup_path}")
return backup_path
## 创建测试文件并备份
test_file = "backup_test.txt"
with open(test_file, "w", encoding="utf-8") as f:
f.write("原始版本的内容")
backup1 = backup_file(test_file)
## 修改文件并再次备份
time.sleep(1) # 确保时间戳不同
with open(test_file, "w", encoding="utf-8") as f:
f.write("修改后的内容")
backup2 = backup_file(test_file)
## 列出备份文件
if os.path.exists("backups"):
print("备份文件列表:")
for backup_file in os.listdir("backups"):
backup_path = os.path.join("backups", backup_file)
print(f" {backup_file}: {os.path.getsize(backup_path)} 字节")
## 清理备份测试
if os.path.exists(test_file):
os.remove(test_file)
if os.path.exists("backups"):
shutil.rmtree("backups")
print("\n 高级文件操作示例完成")
⚠️ 注意事项
常见陷阱和最佳实践
## open()函数的常见陷阱和最佳实践
print("open()函数常见陷阱和最佳实践:")
## 陷阱 1: 忘记关闭文件
print("\n 陷阱 1: 文件未正确关闭")
## 错误方式
print("错误方式(可能导致资源泄露):")
f = open("sample.txt", "w", encoding="utf-8")
f.write("这个文件可能不会被正确关闭")
## 忘记调用 f.close()
## 正确方式 1: 手动关闭
print("正确方式 1(手动关闭):")
f = open("sample.txt", "w", encoding="utf-8")
try:
f.write("手动关闭的文件")
finally:
f.close()
## 正确方式 2: 使用 with 语句(推荐)
print("正确方式 2(with 语句,推荐):")
with open("sample.txt", "w", encoding="utf-8") as f:
f.write("自动关闭的文件")
## 文件在这里自动关闭
## 陷阱 2: 编码问题
print("\n 陷阱 2: 编码问题")
## 创建包含中文的文件
with open("chinese_test.txt", "w", encoding="utf-8") as f:
f.write("中文内容测试")
## 错误方式:不指定编码或使用错误编码
try:
with open("chinese_test.txt", "r") as f: # 可能使用系统默认编码
content = f.read()
print(f"系统默认编码读取: {content}")
except UnicodeDecodeError as e:
print(f"编码错误: {e}")
## 正确方式:明确指定编码
with open("chinese_test.txt", "r", encoding="utf-8") as f:
content = f.read()
print(f"UTF-8 编码读取: {content}")
## 陷阱 3: 文件模式混淆
print("\n 陷阱 3: 文件模式混淆")
## 创建测试文件
with open("mode_test.txt", "w", encoding="utf-8") as f:
f.write("原始内容")
## 错误:在只读模式下尝试写入
try:
with open("mode_test.txt", "r", encoding="utf-8") as f:
f.write("尝试写入") # 这会失败
except io.UnsupportedOperation as e:
print(f"只读模式写入错误: {e}")
## 错误:在写入模式下尝试读取(w 模式会清空文件)
with open("mode_test.txt", "w", encoding="utf-8") as f:
f.write("新内容")
try:
f.seek(0)
content = f.read() # 这可能不会按预期工作
except io.UnsupportedOperation as e:
print(f"写入模式读取错误: {e}")
## 正确方式:使用适当的模式
with open("mode_test.txt", "r+", encoding="utf-8") as f:
original = f.read()
f.seek(0, 2) # 移动到文件末尾
f.write("\n 追加内容")
f.seek(0)
updated = f.read()
print(f"r+模式操作成功: {len(updated)} > {len(original)}")
## 陷阱 4: 路径问题
print("\n 陷阱 4: 文件路径问题")
## 相对路径 vs 绝对路径
current_dir = os.getcwd()
print(f"当前工作目录: {current_dir}")
## 相对路径
relative_path = "test_file.txt"
with open(relative_path, "w", encoding="utf-8") as f:
f.write("相对路径文件")
## 绝对路径
absolute_path = os.path.join(current_dir, "test_file_abs.txt")
with open(absolute_path, "w", encoding="utf-8") as f:
f.write("绝对路径文件")
print(f"相对路径文件: {os.path.exists(relative_path)}")
print(f"绝对路径文件: {os.path.exists(absolute_path)}")
## 路径分隔符问题
print("\n 路径分隔符处理:")
## 错误方式(硬编码路径分隔符)
## wrong_path = "data\\files\\test.txt" # Windows 风格
## wrong_path = "data/files/test.txt" # Unix 风格
## 正确方式(使用 os.path.join)
correct_path = os.path.join("data", "files", "test.txt")
print(f"正确的路径: {correct_path}")
## 陷阱 5: 大文件处理
print("\n 陷阱 5: 大文件处理")
## 错误方式:一次性读取大文件
print("错误方式(可能导致内存不足):")
## 创建一个较大的测试文件
large_content = "大文件内容行\n" * 10000
with open("large_file.txt", "w", encoding="utf-8") as f:
f.write(large_content)
## 这种方式对于非常大的文件可能导致内存问题
with open("large_file.txt", "r", encoding="utf-8") as f:
all_content = f.read() # 一次性读取所有内容
print(f"一次性读取: {len(all_content)} 字符")
## 正确方式:分块处理
print("正确方式(分块处理):")
chunk_size = 1024 # 1KB 块
total_chars = 0
with open("large_file.txt", "r", encoding="utf-8") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
total_chars += len(chunk)
# # 处理 chunk...
print(f"分块处理总字符数: {total_chars}")
## 陷阱 6: 文件存在性检查
print("\n 陷阱 6: 文件存在性检查")
## 错误方式:不检查文件是否存在
try:
with open("nonexistent_file.txt", "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError as e:
print(f"文件不存在错误: {e}")
## 正确方式:先检查文件是否存在
filename = "maybe_exists.txt"
if os.path.exists(filename):
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
print(f"文件存在,读取成功")
else:
print(f"文件 {filename} 不存在")
# # 可以选择创建文件或采取其他行动
with open(filename, "w", encoding="utf-8") as f:
f.write("新创建的文件")
print(f"已创建文件 {filename}")
## 陷阱 7: 缓冲区问题
print("\n 陷阱 7: 缓冲区问题")
## 创建一个需要实时写入的场景
log_file = "realtime.log"
with open(log_file, "w", encoding="utf-8") as f:
for i in range(3):
f.write(f"日志条目 {i+1}\n")
# # 不刷新缓冲区,内容可能不会立即写入磁盘
print(f"写入条目 {i+1}(可能还在缓冲区)")
time.sleep(0.5)
print("\n 使用 flush()强制刷新:")
with open(log_file, "w", encoding="utf-8") as f:
for i in range(3):
f.write(f"实时日志条目 {i+1}\n")
f.flush() # 强制刷新到磁盘
print(f"实时写入条目 {i+1}")
time.sleep(0.5)
## 陷阱 8: 异常处理不当
print("\n 陷阱 8: 异常处理")
## 错误方式:捕获过于宽泛的异常
try:
with open("test_exception.txt", "r", encoding="utf-8") as f:
content = f.read()
except Exception as e: # 过于宽泛
print(f"发生了某种错误: {e}")
## 正确方式:捕获具体的异常
try:
with open("test_exception.txt", "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
print("文件不存在,创建新文件")
with open("test_exception.txt", "w", encoding="utf-8") as f:
f.write("新创建的文件内容")
except PermissionError:
print("没有权限访问文件")
except UnicodeDecodeError:
print("文件编码问题")
except OSError as e:
print(f"操作系统相关错误: {e}")
## 清理测试文件
test_files = [
"sample.txt", "chinese_test.txt", "mode_test.txt",
"test_file.txt", "test_file_abs.txt", "large_file.txt",
"maybe_exists.txt", "realtime.log", "test_exception.txt"
]
for filename in test_files:
try:
os.remove(filename)
except FileNotFoundError:
pass
print("\n 已清理所有测试文件")
🔧 性能优化
文件操作性能对比
## 文件操作性能对比
print("文件操作性能对比:")
import time
## 创建测试数据
test_lines = [f"这是第{i}行测试数据\n" for i in range(10000)]
test_content = "".join(test_lines)
print(f"测试数据: {len(test_lines)} 行, {len(test_content)} 字符")
## 性能测试 1: 不同写入方式
print("\n1. 写入性能对比:")
## 方法 1: 逐行写入
start_time = time.time()
with open("perf_test1.txt", "w", encoding="utf-8") as f:
for line in test_lines:
f.write(line)
time1 = time.time() - start_time
## 方法 2: 批量写入
start_time = time.time()
with open("perf_test2.txt", "w", encoding="utf-8") as f:
f.write(test_content)
time2 = time.time() - start_time
## 方法 3: writelines
start_time = time.time()
with open("perf_test3.txt", "w", encoding="utf-8") as f:
f.writelines(test_lines)
time3 = time.time() - start_time
print(f"逐行写入: {time1:.6f}秒")
print(f"批量写入: {time2:.6f}秒")
print(f"writelines: {time3:.6f}秒")
print(f"最快方法: {'批量写入' if time2 < min(time1, time3) else 'writelines' if time3 < time1 else '逐行写入'}")
## 性能测试 2: 不同读取方式
print("\n2. 读取性能对比:")
## 方法 1: read()全部读取
start_time = time.time()
with open("perf_test1.txt", "r", encoding="utf-8") as f:
content = f.read()
time1 = time.time() - start_time
## 方法 2: readlines()
start_time = time.time()
with open("perf_test1.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
time2 = time.time() - start_time
## 方法 3: 逐行迭代
start_time = time.time()
lines_count = 0
with open("perf_test1.txt", "r", encoding="utf-8") as f:
for line in f:
lines_count += 1
time3 = time.time() - start_time
print(f"read()全部: {time1:.6f}秒")
print(f"readlines(): {time2:.6f}秒")
print(f"逐行迭代: {time3:.6f}秒 (处理了{lines_count}行)")
## 性能测试 3: 缓冲区大小影响
print("\n3. 缓冲区大小影响:")
buffer_sizes = [1024, 8192, 65536] # 1KB, 8KB, 64KB
for buffer_size in buffer_sizes:
start_time = time.time()
with open("perf_test1.txt", "r", encoding="utf-8", buffering=buffer_size) as f:
content = f.read()
elapsed = time.time() - start_time
print(f"缓冲区 {buffer_size//1024}KB: {elapsed:.6f}秒")
## 清理性能测试文件
for i in range(1, 4):
try:
os.remove(f"perf_test{i}.txt")
except FileNotFoundError:
pass
print("\n 性能测试完成")
📚 相关函数和模块
相关内置函数
相关标准库模块
os- 操作系统接口os.path- 路径操作pathlib- 面向对象的路径操作io- 核心 I/O 工具tempfile- 临时文件和目录shutil- 高级文件操作glob- 文件名模式匹配csv- CSV 文件读写json- JSON 数据处理pickle- Python 对象序列化
🔗 扩展阅读
最后更新: 2024-01-15
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。