open() - 文件打开函数

打开文件并返回文件对象的内置函数

分类: builtins 难度: 中级 更新: 2024-01-15
文件操作 输入输出 文本处理 二进制

open() - 文件打开函数

📝 概述

open() 是 Python 中用于文件操作的核心内置函数,它打开一个文件并返回对应的文件对象。通过这个函数,我们可以读取、写入、修改文件内容,支持文本和二进制两种模式,是 Python 文件 I/O 操作的基础。

🎯 学习目标

  • 掌握 open()函数的完整语法和所有参数
  • 理解不同文件模式的用途和区别
  • 学会处理文本和二进制文件
  • 掌握文件编码和错误处理
  • 学会使用上下文管理器安全操作文件

📋 前置知识

  • Python 基本语法
  • 字符串操作
  • 异常处理基础
  • 上下文管理器(with 语句)
  • 编码概念(UTF-8、ASCII 等)

🔍 详细内容

基本概念

open() 函数打开指定路径的文件,返回一个文件对象,该对象提供了读取、写入文件内容的方法。文件对象支持迭代器协议,可以逐行读取文件内容。

语法格式

open(file, mode='r', buffering=-1, encoding=None, errors=None, 
     newline=None, closefd=True, opener=None)

参数说明

参数名 类型 必需 默认值 说明
file str/PathLike 文件路径或文件描述符
mode str ‘r’ 文件打开模式
buffering int -1 缓冲策略
encoding str None 文本编码方式
errors str None 编码错误处理方式
newline str None 换行符处理方式
closefd bool True 是否关闭文件描述符
opener callable None 自定义文件打开器

文件模式详解

模式 说明 文件存在 文件不存在 读取 写入 指针位置
‘r’ 只读(默认) 打开 报错 开头
‘w’ 只写 清空后打开 创建 开头
‘a’ 追加写入 打开 创建 末尾
‘x’ 独占创建 报错 创建 开头
‘r+’ 读写 打开 报错 开头
‘w+’ 读写 清空后打开 创建 开头
‘a+’ 读写追加 打开 创建 末尾

模式修饰符

修饰符 说明
‘b’ 二进制模式(如 ‘rb’, ‘wb’)
‘t’ 文本模式(默认,如 ‘rt’, ‘wt’)

返回值

  • 类型: TextIOWrapper(文本模式)或 BufferedReader/BufferedWriter(二进制模式)
  • 说明: 文件对象,支持读写操作和上下文管理器协议

💡 实际应用

基础文件操作

## 基础文件读取
print("基础文件操作示例:")

## 创建示例文件
sample_content = """第一行内容
第二行内容
第三行内容
包含中文的行
数字行: 123456
特殊字符: !@#$%^&*()
空行在下面:

最后一行"""

## 写入示例文件
with open("sample.txt", "w", encoding="utf-8") as f:
    f.write(sample_content)

print("已创建示例文件 sample.txt")

## 1. 读取整个文件
print("\n1. 读取整个文件:")
with open("sample.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print(f"文件内容:\n{content}")
    print(f"文件大小: {len(content)} 字符")

## 2. 逐行读取
print("\n2. 逐行读取:")
with open("sample.txt", "r", encoding="utf-8") as f:
    line_number = 1
    for line in f:
        print(f"第{line_number}行: {line.rstrip()}")
        line_number += 1

## 3. 读取所有行到列表
print("\n3. 读取所有行到列表:")
with open("sample.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    print(f"总共 {len(lines)} 行")
    for i, line in enumerate(lines, 1):
        print(f"行{i}: {repr(line)}")

## 4. 读取指定字符数
print("\n4. 读取指定字符数:")
with open("sample.txt", "r", encoding="utf-8") as f:
    first_20_chars = f.read(20)
    print(f"前 20 个字符: {repr(first_20_chars)}")
    
    next_10_chars = f.read(10)
    print(f"接下来 10 个字符: {repr(next_10_chars)}")

## 5. 读取一行
print("\n5. 读取一行:")
with open("sample.txt", "r", encoding="utf-8") as f:
    first_line = f.readline()
    second_line = f.readline()
    print(f"第一行: {repr(first_line)}")
    print(f"第二行: {repr(second_line)}")

## 6. 文件指针操作
print("\n6. 文件指针操作:")
with open("sample.txt", "r", encoding="utf-8") as f:
    print(f"初始位置: {f.tell()}")
    
    data = f.read(10)
    print(f"读取 10 字符后位置: {f.tell()}")
    print(f"读取的内容: {repr(data)}")
    
    f.seek(0)  # 回到文件开头
    print(f"seek(0)后位置: {f.tell()}")
    
    f.seek(5)  # 移动到第 5 个字符
    print(f"seek(5)后位置: {f.tell()}")
    data = f.read(5)
    print(f"从位置 5 读取 5 字符: {repr(data)}")

## 7. 检查文件状态
print("\n7. 文件对象属性:")
with open("sample.txt", "r", encoding="utf-8") as f:
    print(f"文件名: {f.name}")
    print(f"文件模式: {f.mode}")
    print(f"编码: {f.encoding}")
    print(f"是否关闭: {f.closed}")
    print(f"是否可读: {f.readable()}")
    print(f"是否可写: {f.writable()}")
    print(f"是否可定位: {f.seekable()}")

print(f"文件关闭后是否关闭: {f.closed}")

## 8. 不同读取方式的性能对比
print("\n8. 读取方式性能对比:")
import time

## 创建较大的测试文件
large_content = "\n".join([f"这是第{i}行内容" for i in range(1000)])
with open("large_sample.txt", "w", encoding="utf-8") as f:
    f.write(large_content)

## 方法 1: read()全部读取
start_time = time.time()
with open("large_sample.txt", "r", encoding="utf-8") as f:
    content = f.read()
time_read_all = time.time() - start_time

## 方法 2: readlines()读取所有行
start_time = time.time()
with open("large_sample.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
time_readlines = time.time() - start_time

## 方法 3: 逐行迭代
start_time = time.time()
with open("large_sample.txt", "r", encoding="utf-8") as f:
    for line in f:
        pass  # 只是迭代,不做处理
time_iterate = time.time() - start_time

print(f"read()全部读取: {time_read_all:.6f}秒")
print(f"readlines()读取: {time_readlines:.6f}秒")
print(f"逐行迭代: {time_iterate:.6f}秒")

## 清理测试文件
import os
os.remove("large_sample.txt")

文件写入操作

## 文件写入操作
print("文件写入操作示例:")

## 1. 基本写入(覆盖模式)
print("\n1. 基本写入(覆盖模式):")
with open("write_test.txt", "w", encoding="utf-8") as f:
    f.write("这是第一行\n")
    f.write("这是第二行\n")
    f.write("这是第三行")

with open("write_test.txt", "r", encoding="utf-8") as f:
    print(f"写入结果:\n{f.read()}")

## 2. 追加写入
print("\n2. 追加写入:")
with open("write_test.txt", "a", encoding="utf-8") as f:
    f.write("\n 这是追加的第四行\n")
    f.write("这是追加的第五行")

with open("write_test.txt", "r", encoding="utf-8") as f:
    print(f"追加后结果:\n{f.read()}")

## 3. 写入多行
print("\n3. 写入多行:")
lines_to_write = [
    "多行写入示例\n",
    "第二行内容\n",
    "第三行内容\n",
    "最后一行"
]

with open("multiline_test.txt", "w", encoding="utf-8") as f:
    f.writelines(lines_to_write)

with open("multiline_test.txt", "r", encoding="utf-8") as f:
    print(f"多行写入结果:\n{f.read()}")

## 4. 格式化写入
print("\n4. 格式化写入:")
data = [
    {"name": "张三", "age": 25, "score": 95.5},
    {"name": "李四", "age": 30, "score": 87.2},
    {"name": "王五", "age": 28, "score": 92.8}
]

with open("formatted_data.txt", "w", encoding="utf-8") as f:
    f.write("学生成绩表\n")
    f.write("=" * 30 + "\n")
    f.write(f"{'姓名':<10}{'年龄':<10}{'分数':<10}\n")
    f.write("-" * 30 + "\n")
    
    for student in data:
        f.write(f"{student['name']:<10}{student['age']:<10}{student['score']:<10.1f}\n")

with open("formatted_data.txt", "r", encoding="utf-8") as f:
    print(f"格式化写入结果:\n{f.read()}")

## 5. CSV 格式写入
print("\n5. CSV 格式写入:")
csv_data = [
    ["姓名", "年龄", "城市", "职业"],
    ["张三", "25", "北京", "工程师"],
    ["李四", "30", "上海", "设计师"],
    ["王五", "28", "广州", "产品经理"]
]

with open("data.csv", "w", encoding="utf-8") as f:
    for row in csv_data:
        f.write(",".join(map(str, row)) + "\n")

with open("data.csv", "r", encoding="utf-8") as f:
    print(f"CSV 写入结果:\n{f.read()}")

## 6. JSON 格式写入
print("\n6. JSON 格式写入:")
import json

json_data = {
    "users": [
        {"id": 1, "name": "张三", "email": "zhangsan@example.com"},
        {"id": 2, "name": "李四", "email": "lisi@example.com"}
    ],
    "settings": {
        "theme": "dark",
        "language": "zh-CN"
    }
}

with open("data.json", "w", encoding="utf-8") as f:
    json.dump(json_data, f, ensure_ascii=False, indent=2)

with open("data.json", "r", encoding="utf-8") as f:
    print(f"JSON 写入结果:\n{f.read()}")

## 7. 实时写入和刷新
print("\n7. 实时写入和刷新:")
with open("realtime_log.txt", "w", encoding="utf-8") as f:
    for i in range(5):
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"[{timestamp}] 处理第 {i+1} 项任务\n"
        f.write(log_entry)
        f.flush()  # 强制刷新到磁盘
        print(f"已写入: {log_entry.strip()}")
        time.sleep(0.5)

with open("realtime_log.txt", "r", encoding="utf-8") as f:
    print(f"\n 实时日志结果:\n{f.read()}")

## 8. 读写模式(r+, w+, a+)
print("\n8. 读写模式示例:")

## r+ 模式:读写,文件必须存在
with open("write_test.txt", "r+", encoding="utf-8") as f:
    content = f.read()
    print(f"r+ 模式读取内容长度: {len(content)} 字符")
    
    f.seek(0, 2)  # 移动到文件末尾
    f.write("\nr+ 模式追加的内容")
    
    f.seek(0)  # 回到开头
    updated_content = f.read()
    print(f"r+ 模式更新后长度: {len(updated_content)} 字符")

## w+ 模式:读写,清空文件内容
with open("w_plus_test.txt", "w+", encoding="utf-8") as f:
    f.write("w+ 模式写入的内容\n")
    f.write("第二行内容")
    
    f.seek(0)  # 回到开头读取
    content = f.read()
    print(f"w+ 模式内容: {repr(content)}")

## a+ 模式:读写,追加模式
with open("a_plus_test.txt", "w", encoding="utf-8") as f:
    f.write("初始内容\n")

with open("a_plus_test.txt", "a+", encoding="utf-8") as f:
    f.write("a+ 模式追加内容\n")
    
    f.seek(0)  # 回到开头读取
    content = f.read()
    print(f"a+ 模式内容: {repr(content)}")

## 9. 独占创建模式(x)
print("\n9. 独占创建模式:")
try:
    with open("exclusive_test.txt", "x", encoding="utf-8") as f:
        f.write("独占创建的文件内容")
    print("独占创建成功")
except FileExistsError:
    print("文件已存在,独占创建失败")

## 再次尝试创建同名文件
try:
    with open("exclusive_test.txt", "x", encoding="utf-8") as f:
        f.write("这不会被写入")
except FileExistsError:
    print("文件已存在,无法独占创建")

## 清理测试文件
test_files = [
    "write_test.txt", "multiline_test.txt", "formatted_data.txt",
    "data.csv", "data.json", "realtime_log.txt", "w_plus_test.txt",
    "a_plus_test.txt", "exclusive_test.txt"
]

for filename in test_files:
    try:
        os.remove(filename)
    except FileNotFoundError:
        pass

print("\n 已清理测试文件")

二进制文件操作

## 二进制文件操作
print("二进制文件操作示例:")

## 1. 创建二进制数据
print("\n1. 创建和写入二进制数据:")
binary_data = bytes([0x48, 0x65, 0x6C, 0x6C, 0x6F])  # "Hello" 的 ASCII 码
print(f"二进制数据: {binary_data}")
print(f"转换为字符串: {binary_data.decode('ascii')}")

## 写入二进制文件
with open("binary_test.bin", "wb") as f:
    f.write(binary_data)
    f.write(b"\x20World!")  # 添加 " World!"

print("已写入二进制文件")

## 2. 读取二进制文件
print("\n2. 读取二进制文件:")
with open("binary_test.bin", "rb") as f:
    data = f.read()
    print(f"读取的二进制数据: {data}")
    print(f"转换为字符串: {data.decode('ascii')}")
    print(f"数据长度: {len(data)} 字节")

## 3. 分块读取二进制文件
print("\n3. 分块读取二进制文件:")
with open("binary_test.bin", "rb") as f:
    chunk_size = 3
    chunk_num = 1
    
    while True:
        chunk = f.read(chunk_size)
        if not chunk:
            break
        print(f"块 {chunk_num}: {chunk} -> {chunk.decode('ascii', errors='ignore')}")
        chunk_num += 1

## 4. 处理图像文件(模拟)
print("\n4. 处理图像文件头信息:")

## 创建模拟的图像文件头
image_header = b'\x89PNG\r\n\x1a\n'  # PNG 文件头
image_data = b'\x00' * 100  # 模拟图像数据

with open("image_test.png", "wb") as f:
    f.write(image_header)
    f.write(image_data)

## 读取并检查文件头
with open("image_test.png", "rb") as f:
    header = f.read(8)
    print(f"文件头: {header}")
    
    if header.startswith(b'\x89PNG'):
        print("这是一个 PNG 文件")
    else:
        print("这不是 PNG 文件")
    
#    # 获取文件大小
    f.seek(0, 2)  # 移动到文件末尾
    file_size = f.tell()
    print(f"文件大小: {file_size} 字节")

## 5. 数字数据的二进制存储
print("\n5. 数字数据的二进制存储:")
import struct

## 打包数字数据
numbers = [1, 2, 3, 4, 5]
packed_data = struct.pack('5i', *numbers)  # 5 个整数
print(f"打包的数据: {packed_data}")
print(f"数据长度: {len(packed_data)} 字节")

## 写入二进制文件
with open("numbers.bin", "wb") as f:
    f.write(packed_data)

## 读取并解包
with open("numbers.bin", "rb") as f:
    data = f.read()
    unpacked = struct.unpack('5i', data)
    print(f"解包的数据: {unpacked}")

## 6. 混合数据类型的二进制存储
print("\n6. 混合数据类型存储:")

## 存储字符串长度 + 字符串 + 整数 + 浮点数
text = "Hello, Binary!"
text_bytes = text.encode('utf-8')
text_length = len(text_bytes)
integer_value = 42
float_value = 3.14159

with open("mixed_data.bin", "wb") as f:
#    # 写入字符串长度(4 字节整数)
    f.write(struct.pack('I', text_length))
#    # 写入字符串
    f.write(text_bytes)
#    # 写入整数(4 字节)
    f.write(struct.pack('i', integer_value))
#    # 写入浮点数(8 字节双精度)
    f.write(struct.pack('d', float_value))

## 读取混合数据
with open("mixed_data.bin", "rb") as f:
#    # 读取字符串长度
    text_length = struct.unpack('I', f.read(4))[0]
    print(f"字符串长度: {text_length}")
    
#    # 读取字符串
    text_data = f.read(text_length)
    decoded_text = text_data.decode('utf-8')
    print(f"字符串: {decoded_text}")
    
#    # 读取整数
    integer_data = struct.unpack('i', f.read(4))[0]
    print(f"整数: {integer_data}")
    
#    # 读取浮点数
    float_data = struct.unpack('d', f.read(8))[0]
    print(f"浮点数: {float_data}")

## 7. 文件复制(二进制方式)
print("\n7. 二进制文件复制:")

def copy_file_binary(source, destination, chunk_size=8192):
    """二进制方式复制文件"""
    with open(source, "rb") as src, open(destination, "wb") as dst:
        while True:
            chunk = src.read(chunk_size)
            if not chunk:
                break
            dst.write(chunk)
    print(f"文件复制完成: {source} -> {destination}")

## 复制之前创建的二进制文件
copy_file_binary("binary_test.bin", "binary_test_copy.bin")

## 验证复制结果
with open("binary_test.bin", "rb") as f1, open("binary_test_copy.bin", "rb") as f2:
    original = f1.read()
    copied = f2.read()
    print(f"复制验证: {original == copied}")

## 8. 大文件处理
print("\n8. 大文件处理示例:")

## 创建一个较大的二进制文件
large_data = b'A' * 10000  # 10KB 的数据
with open("large_binary.bin", "wb") as f:
    f.write(large_data)

## 分块处理大文件
def process_large_binary_file(filename, chunk_size=1024):
    """分块处理大二进制文件"""
    total_size = 0
    chunk_count = 0
    
    with open(filename, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            
            chunk_count += 1
            total_size += len(chunk)
            
#            # 这里可以对 chunk 进行处理
#            # 例如:计算校验和、压缩、加密等
            
    print(f"处理完成: {chunk_count} 个块, 总大小: {total_size} 字节")
    return total_size

processed_size = process_large_binary_file("large_binary.bin")

## 清理二进制测试文件
binary_test_files = [
    "binary_test.bin", "image_test.png", "numbers.bin",
    "mixed_data.bin", "binary_test_copy.bin", "large_binary.bin"
]

for filename in binary_test_files:
    try:
        os.remove(filename)
    except FileNotFoundError:
        pass

print("\n 已清理二进制测试文件")

编码和错误处理

## 编码和错误处理
print("编码和错误处理示例:")

## 1. 不同编码的文件处理
print("\n1. 不同编码的文件处理:")

## 创建包含各种字符的测试内容
test_content = """英文: Hello World
中文: 你好世界
日文: こんにちは世界
韩文: 안녕하세요 세계
俄文: Привет мир
阿拉伯文: مرحبا بالعالم
表情符号: 😀😊🎉
特殊符号: ©®™€£¥
数学符号: ∑∏∫√∞
"""

## 使用不同编码保存文件
encodings = ['utf-8', 'utf-16', 'gbk', 'ascii']

for encoding in encodings:
    filename = f"test_{encoding.replace('-', '_')}.txt"
    try:
        with open(filename, "w", encoding=encoding) as f:
            f.write(test_content)
        print(f"成功使用 {encoding} 编码保存文件")
        
#        # 读取文件并显示大小
        with open(filename, "r", encoding=encoding) as f:
            content = f.read()
            print(f"  文件大小: {len(content)} 字符")
            
    except UnicodeEncodeError as e:
        print(f"使用 {encoding} 编码失败: {e}")

## 2. 编码检测和转换
print("\n2. 编码检测和转换:")

## 尝试用错误的编码读取文件
print("尝试用不同编码读取 UTF-8 文件:")
with open("test_utf_8.txt", "rb") as f:
    raw_data = f.read()
    print(f"原始字节数据长度: {len(raw_data)} 字节")

## 尝试不同编码解码
test_encodings = ['utf-8', 'gbk', 'latin-1', 'ascii']
for encoding in test_encodings:
    try:
        decoded = raw_data.decode(encoding)
        print(f"使用 {encoding} 解码成功,长度: {len(decoded)} 字符")
        print(f"  前 50 字符: {decoded[:50]}...")
    except UnicodeDecodeError as e:
        print(f"使用 {encoding} 解码失败: {e}")

## 3. 错误处理策略
print("\n3. 错误处理策略:")

## 创建包含无法编码字符的内容
problematic_content = "正常文字 \udcff 无效字符 \udcfe 更多文字"

error_strategies = ['strict', 'ignore', 'replace', 'xmlcharrefreplace', 'backslashreplace']

for strategy in error_strategies:
    filename = f"error_test_{strategy}.txt"
    try:
        with open(filename, "w", encoding="ascii", errors=strategy) as f:
            f.write("ASCII can handle this")
#            # 尝试写入非 ASCII 字符
            f.write("\n 中文字符测试")
        print(f"错误策略 {strategy}: 写入成功")
    except UnicodeEncodeError as e:
        print(f"错误策略 {strategy}: 写入失败 - {e}")

## 读取时的错误处理
print("\n 读取时的错误处理:")

## 创建一个包含混合编码的文件
with open("mixed_encoding.txt", "wb") as f:
    f.write("Hello ".encode('utf-8'))
    f.write("世界".encode('gbk'))  # 故意使用不同编码
    f.write(" World".encode('utf-8'))

## 尝试用不同错误策略读取
for strategy in ['strict', 'ignore', 'replace']:
    try:
        with open("mixed_encoding.txt", "r", encoding="utf-8", errors=strategy) as f:
            content = f.read()
            print(f"错误策略 {strategy}: {repr(content)}")
    except UnicodeDecodeError as e:
        print(f"错误策略 {strategy}: 读取失败 - {e}")

## 4. 自动编码检测(简单版本)
print("\n4. 简单编码检测:")

def detect_encoding_simple(filename):
    """简单的编码检测"""
    encodings_to_try = ['utf-8', 'gbk', 'utf-16', 'latin-1']
    
    for encoding in encodings_to_try:
        try:
            with open(filename, "r", encoding=encoding) as f:
                content = f.read()
                return encoding, content
        except UnicodeDecodeError:
            continue
    
    return None, None

## 测试编码检测
test_files = ['test_utf_8.txt', 'test_gbk.txt']
for filename in test_files:
    if os.path.exists(filename):
        detected_encoding, content = detect_encoding_simple(filename)
        if detected_encoding:
            print(f"文件 {filename} 检测到编码: {detected_encoding}")
            print(f"  内容预览: {content[:50]}...")
        else:
            print(f"文件 {filename} 无法检测编码")

## 5. 编码转换工具
print("\n5. 编码转换工具:")

def convert_file_encoding(source_file, target_file, source_encoding, target_encoding):
    """转换文件编码"""
    try:
        with open(source_file, "r", encoding=source_encoding) as src:
            content = src.read()
        
        with open(target_file, "w", encoding=target_encoding) as dst:
            dst.write(content)
        
        print(f"编码转换成功: {source_encoding} -> {target_encoding}")
        return True
    except Exception as e:
        print(f"编码转换失败: {e}")
        return False

## 转换 UTF-8 文件到 GBK
if os.path.exists("test_utf_8.txt"):
    success = convert_file_encoding(
        "test_utf_8.txt", "converted_gbk.txt", "utf-8", "gbk"
    )
    
    if success:
#        # 验证转换结果
        with open("converted_gbk.txt", "r", encoding="gbk") as f:
            converted_content = f.read()
            print(f"转换后内容长度: {len(converted_content)} 字符")

## 6. BOM(字节顺序标记)处理
print("\n6. BOM 处理:")

## 创建带 BOM 的 UTF-8 文件
with open("bom_test.txt", "wb") as f:
    f.write(b'\xef\xbb\xbf')  # UTF-8 BOM
    f.write("带 BOM 的 UTF-8 文件内容".encode('utf-8'))

## 读取带 BOM 的文件
with open("bom_test.txt", "r", encoding="utf-8-sig") as f:
    content_with_bom_handling = f.read()
    print(f"使用 utf-8-sig 读取: {repr(content_with_bom_handling)}")

with open("bom_test.txt", "r", encoding="utf-8") as f:
    content_without_bom_handling = f.read()
    print(f"使用 utf-8 读取: {repr(content_without_bom_handling)}")

## 7. 文件编码信息获取
print("\n7. 文件编码信息:")

def analyze_file_encoding(filename):
    """分析文件编码信息"""
    try:
#        # 读取文件的前几个字节检查 BOM
        with open(filename, "rb") as f:
            first_bytes = f.read(4)
            
        bom_info = ""
        if first_bytes.startswith(b'\xef\xbb\xbf'):
            bom_info = "UTF-8 BOM"
        elif first_bytes.startswith(b'\xff\xfe'):
            bom_info = "UTF-16 LE BOM"
        elif first_bytes.startswith(b'\xfe\xff'):
            bom_info = "UTF-16 BE BOM"
        else:
            bom_info = "无 BOM"
        
        print(f"文件 {filename}:")
        print(f"  前 4 字节: {first_bytes}")
        print(f"  BOM 信息: {bom_info}")
        
#        # 获取文件大小
        file_size = os.path.getsize(filename)
        print(f"  文件大小: {file_size} 字节")
        
    except Exception as e:
        print(f"分析文件 {filename} 失败: {e}")

## 分析不同编码的文件
for filename in ['test_utf_8.txt', 'bom_test.txt']:
    if os.path.exists(filename):
        analyze_file_encoding(filename)

## 清理编码测试文件
encoding_test_files = [
    'test_utf_8.txt', 'test_utf_16.txt', 'test_gbk.txt',
    'error_test_strict.txt', 'error_test_ignore.txt', 'error_test_replace.txt',
    'error_test_xmlcharrefreplace.txt', 'error_test_backslashreplace.txt',
    'mixed_encoding.txt', 'converted_gbk.txt', 'bom_test.txt'
]

for filename in encoding_test_files:
    try:
        os.remove(filename)
    except FileNotFoundError:
        pass

print("\n 已清理编码测试文件")

高级文件操作

## 高级文件操作
print("高级文件操作示例:")

## 1. 文件锁定(简单实现)
print("\n1. 文件锁定机制:")
import fcntl
import time
import threading

def write_with_lock(filename, content, delay=0):
    """带锁的文件写入"""
    try:
        with open(filename, "a", encoding="utf-8") as f:
#            # 在 Unix 系统上使用文件锁
            if hasattr(fcntl, 'flock'):
                fcntl.flock(f.fileno(), fcntl.LOCK_EX)
            
            f.write(f"[{threading.current_thread().name}] {content}\n")
            if delay:
                time.sleep(delay)
            
#            # 锁会在文件关闭时自动释放
        print(f"线程 {threading.current_thread().name} 写入完成")
    except Exception as e:
        print(f"写入失败: {e}")

## 创建多个线程同时写入文件
lock_test_file = "lock_test.txt"
if os.path.exists(lock_test_file):
    os.remove(lock_test_file)

threads = []
for i in range(3):
    thread = threading.Thread(
        target=write_with_lock,
        args=(lock_test_file, f"消息来自线程{i}", 0.5),
        name=f"Thread-{i}"
    )
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

## 检查写入结果
if os.path.exists(lock_test_file):
    with open(lock_test_file, "r", encoding="utf-8") as f:
        print(f"文件锁定测试结果:\n{f.read()}")
    os.remove(lock_test_file)

## 2. 临时文件操作
print("\n2. 临时文件操作:")
import tempfile

## 创建临时文件
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", delete=False) as temp_file:
    temp_filename = temp_file.name
    temp_file.write("这是临时文件的内容\n")
    temp_file.write("第二行内容")
    print(f"创建临时文件: {temp_filename}")

## 读取临时文件
with open(temp_filename, "r", encoding="utf-8") as f:
    temp_content = f.read()
    print(f"临时文件内容: {temp_content}")

## 手动删除临时文件
os.remove(temp_filename)
print("临时文件已删除")

## 使用上下文管理器自动删除临时文件
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as temp_file:
    temp_file.write("自动删除的临时文件")
    temp_file.seek(0)
    content = temp_file.read()
    print(f"自动删除临时文件内容: {content}")
    print(f"临时文件路径: {temp_file.name}")
## 文件在这里自动删除

## 3. 内存映射文件
print("\n3. 内存映射文件:")
import mmap

## 创建测试文件
test_data = b"Hello, Memory Mapped File! " * 100
with open("mmap_test.txt", "wb") as f:
    f.write(test_data)

## 使用内存映射读取文件
with open("mmap_test.txt", "r+b") as f:
    with mmap.mmap(f.fileno(), 0) as mm:
        print(f"内存映射文件大小: {len(mm)} 字节")
        print(f"前 50 字节: {mm[:50]}")
        
#        # 搜索内容
        pos = mm.find(b"Memory")
        if pos != -1:
            print(f"找到 'Memory' 在位置: {pos}")
        
#        # 修改内容
        mm[0:5] = b"Hi!!!"  # 替换前 5 个字节
        
#        # 读取修改后的内容
        mm.seek(0)
        modified_content = mm.read(50)
        print(f"修改后前 50 字节: {modified_content}")

## 验证修改
with open("mmap_test.txt", "rb") as f:
    content = f.read(50)
    print(f"文件修改验证: {content}")

os.remove("mmap_test.txt")

## 4. 文件监控(简单实现)
print("\n4. 文件监控:")

def monitor_file_changes(filename, check_interval=1, duration=5):
    """监控文件变化"""
    if not os.path.exists(filename):
        print(f"文件 {filename} 不存在")
        return
    
    last_modified = os.path.getmtime(filename)
    last_size = os.path.getsize(filename)
    
    print(f"开始监控文件: {filename}")
    print(f"初始修改时间: {time.ctime(last_modified)}")
    print(f"初始文件大小: {last_size} 字节")
    
    start_time = time.time()
    while time.time() - start_time < duration:
        current_modified = os.path.getmtime(filename)
        current_size = os.path.getsize(filename)
        
        if current_modified != last_modified:
            print(f"文件已修改: {time.ctime(current_modified)}")
            last_modified = current_modified
        
        if current_size != last_size:
            print(f"文件大小变化: {last_size} -> {current_size} 字节")
            last_size = current_size
        
        time.sleep(check_interval)
    
    print("监控结束")

## 创建测试文件并启动监控
monitor_test_file = "monitor_test.txt"
with open(monitor_test_file, "w", encoding="utf-8") as f:
    f.write("初始内容")

## 在另一个线程中修改文件
def modify_file():
    time.sleep(2)
    with open(monitor_test_file, "a", encoding="utf-8") as f:
        f.write("\n 添加的内容")
    
    time.sleep(2)
    with open(monitor_test_file, "a", encoding="utf-8") as f:
        f.write("\n 更多内容")

modify_thread = threading.Thread(target=modify_file)
modify_thread.start()

## 监控文件(这里简化为短时间)
monitor_file_changes(monitor_test_file, check_interval=0.5, duration=3)

modify_thread.join()
os.remove(monitor_test_file)

## 5. 文件压缩和解压
print("\n5. 文件压缩和解压:")
import gzip
import zipfile

## 创建测试数据
test_content = "这是要压缩的测试内容。" * 100

## gzip 压缩
with open("test_data.txt", "w", encoding="utf-8") as f:
    f.write(test_content)

original_size = os.path.getsize("test_data.txt")

## 使用 gzip 压缩
with open("test_data.txt", "rb") as f_in:
    with gzip.open("test_data.txt.gz", "wb") as f_out:
        f_out.write(f_in.read())

compressed_size = os.path.getsize("test_data.txt.gz")
compression_ratio = (1 - compressed_size / original_size) * 100

print(f"原始文件大小: {original_size} 字节")
print(f"压缩文件大小: {compressed_size} 字节")
print(f"压缩率: {compression_ratio:.2f}%")

## gzip 解压
with gzip.open("test_data.txt.gz", "rb") as f_in:
    with open("test_data_decompressed.txt", "wb") as f_out:
        f_out.write(f_in.read())

## 验证解压结果
with open("test_data_decompressed.txt", "r", encoding="utf-8") as f:
    decompressed_content = f.read()
    print(f"解压验证: {decompressed_content == test_content}")

## ZIP 压缩多个文件
with zipfile.ZipFile("test_archive.zip", "w") as zip_file:
    zip_file.write("test_data.txt")
    zip_file.write("test_data_decompressed.txt")

print(f"ZIP 文件大小: {os.path.getsize('test_archive.zip')} 字节")

## 列出 ZIP 文件内容
with zipfile.ZipFile("test_archive.zip", "r") as zip_file:
    print("ZIP 文件内容:")
    for info in zip_file.infolist():
        print(f"  {info.filename}: {info.file_size} 字节")

## 清理压缩测试文件
compression_test_files = [
    "test_data.txt", "test_data.txt.gz", 
    "test_data_decompressed.txt", "test_archive.zip"
]

for filename in compression_test_files:
    try:
        os.remove(filename)
    except FileNotFoundError:
        pass

## 6. 文件备份和版本控制
print("\n6. 文件备份和版本控制:")
import shutil
from datetime import datetime

def backup_file(filename, backup_dir="backups"):
    """创建文件备份"""
    if not os.path.exists(filename):
        print(f"文件 {filename} 不存在")
        return None
    
#    # 创建备份目录
    if not os.path.exists(backup_dir):
        os.makedirs(backup_dir)
    
#    # 生成备份文件名
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_name = os.path.splitext(os.path.basename(filename))[0]
    extension = os.path.splitext(filename)[1]
    backup_filename = f"{base_name}_{timestamp}{extension}"
    backup_path = os.path.join(backup_dir, backup_filename)
    
#    # 复制文件
    shutil.copy2(filename, backup_path)
    print(f"备份创建: {filename} -> {backup_path}")
    return backup_path

## 创建测试文件并备份
test_file = "backup_test.txt"
with open(test_file, "w", encoding="utf-8") as f:
    f.write("原始版本的内容")

backup1 = backup_file(test_file)

## 修改文件并再次备份
time.sleep(1)  # 确保时间戳不同
with open(test_file, "w", encoding="utf-8") as f:
    f.write("修改后的内容")

backup2 = backup_file(test_file)

## 列出备份文件
if os.path.exists("backups"):
    print("备份文件列表:")
    for backup_file in os.listdir("backups"):
        backup_path = os.path.join("backups", backup_file)
        print(f"  {backup_file}: {os.path.getsize(backup_path)} 字节")

## 清理备份测试
if os.path.exists(test_file):
    os.remove(test_file)
if os.path.exists("backups"):
    shutil.rmtree("backups")

print("\n 高级文件操作示例完成")

⚠️ 注意事项

常见陷阱和最佳实践

## open()函数的常见陷阱和最佳实践
print("open()函数常见陷阱和最佳实践:")

## 陷阱 1: 忘记关闭文件
print("\n 陷阱 1: 文件未正确关闭")

## 错误方式
print("错误方式(可能导致资源泄露):")
f = open("sample.txt", "w", encoding="utf-8")
f.write("这个文件可能不会被正确关闭")
## 忘记调用 f.close()

## 正确方式 1: 手动关闭
print("正确方式 1(手动关闭):")
f = open("sample.txt", "w", encoding="utf-8")
try:
    f.write("手动关闭的文件")
finally:
    f.close()

## 正确方式 2: 使用 with 语句(推荐)
print("正确方式 2(with 语句,推荐):")
with open("sample.txt", "w", encoding="utf-8") as f:
    f.write("自动关闭的文件")
## 文件在这里自动关闭

## 陷阱 2: 编码问题
print("\n 陷阱 2: 编码问题")

## 创建包含中文的文件
with open("chinese_test.txt", "w", encoding="utf-8") as f:
    f.write("中文内容测试")

## 错误方式:不指定编码或使用错误编码
try:
    with open("chinese_test.txt", "r") as f:  # 可能使用系统默认编码
        content = f.read()
        print(f"系统默认编码读取: {content}")
except UnicodeDecodeError as e:
    print(f"编码错误: {e}")

## 正确方式:明确指定编码
with open("chinese_test.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print(f"UTF-8 编码读取: {content}")

## 陷阱 3: 文件模式混淆
print("\n 陷阱 3: 文件模式混淆")

## 创建测试文件
with open("mode_test.txt", "w", encoding="utf-8") as f:
    f.write("原始内容")

## 错误:在只读模式下尝试写入
try:
    with open("mode_test.txt", "r", encoding="utf-8") as f:
        f.write("尝试写入")  # 这会失败
except io.UnsupportedOperation as e:
    print(f"只读模式写入错误: {e}")

## 错误:在写入模式下尝试读取(w 模式会清空文件)
with open("mode_test.txt", "w", encoding="utf-8") as f:
    f.write("新内容")
    try:
        f.seek(0)
        content = f.read()  # 这可能不会按预期工作
    except io.UnsupportedOperation as e:
        print(f"写入模式读取错误: {e}")

## 正确方式:使用适当的模式
with open("mode_test.txt", "r+", encoding="utf-8") as f:
    original = f.read()
    f.seek(0, 2)  # 移动到文件末尾
    f.write("\n 追加内容")
    f.seek(0)
    updated = f.read()
    print(f"r+模式操作成功: {len(updated)} > {len(original)}")

## 陷阱 4: 路径问题
print("\n 陷阱 4: 文件路径问题")

## 相对路径 vs 绝对路径
current_dir = os.getcwd()
print(f"当前工作目录: {current_dir}")

## 相对路径
relative_path = "test_file.txt"
with open(relative_path, "w", encoding="utf-8") as f:
    f.write("相对路径文件")

## 绝对路径
absolute_path = os.path.join(current_dir, "test_file_abs.txt")
with open(absolute_path, "w", encoding="utf-8") as f:
    f.write("绝对路径文件")

print(f"相对路径文件: {os.path.exists(relative_path)}")
print(f"绝对路径文件: {os.path.exists(absolute_path)}")

## 路径分隔符问题
print("\n 路径分隔符处理:")
## 错误方式(硬编码路径分隔符)
## wrong_path = "data\\files\\test.txt"  # Windows 风格
## wrong_path = "data/files/test.txt"     # Unix 风格

## 正确方式(使用 os.path.join)
correct_path = os.path.join("data", "files", "test.txt")
print(f"正确的路径: {correct_path}")

## 陷阱 5: 大文件处理
print("\n 陷阱 5: 大文件处理")

## 错误方式:一次性读取大文件
print("错误方式(可能导致内存不足):")
## 创建一个较大的测试文件
large_content = "大文件内容行\n" * 10000
with open("large_file.txt", "w", encoding="utf-8") as f:
    f.write(large_content)

## 这种方式对于非常大的文件可能导致内存问题
with open("large_file.txt", "r", encoding="utf-8") as f:
    all_content = f.read()  # 一次性读取所有内容
    print(f"一次性读取: {len(all_content)} 字符")

## 正确方式:分块处理
print("正确方式(分块处理):")
chunk_size = 1024  # 1KB 块
total_chars = 0
with open("large_file.txt", "r", encoding="utf-8") as f:
    while True:
        chunk = f.read(chunk_size)
        if not chunk:
            break
        total_chars += len(chunk)
#        # 处理 chunk...

print(f"分块处理总字符数: {total_chars}")

## 陷阱 6: 文件存在性检查
print("\n 陷阱 6: 文件存在性检查")

## 错误方式:不检查文件是否存在
try:
    with open("nonexistent_file.txt", "r", encoding="utf-8") as f:
        content = f.read()
except FileNotFoundError as e:
    print(f"文件不存在错误: {e}")

## 正确方式:先检查文件是否存在
filename = "maybe_exists.txt"
if os.path.exists(filename):
    with open(filename, "r", encoding="utf-8") as f:
        content = f.read()
        print(f"文件存在,读取成功")
else:
    print(f"文件 {filename} 不存在")
#    # 可以选择创建文件或采取其他行动
    with open(filename, "w", encoding="utf-8") as f:
        f.write("新创建的文件")
    print(f"已创建文件 {filename}")

## 陷阱 7: 缓冲区问题
print("\n 陷阱 7: 缓冲区问题")

## 创建一个需要实时写入的场景
log_file = "realtime.log"
with open(log_file, "w", encoding="utf-8") as f:
    for i in range(3):
        f.write(f"日志条目 {i+1}\n")
#        # 不刷新缓冲区,内容可能不会立即写入磁盘
        print(f"写入条目 {i+1}(可能还在缓冲区)")
        time.sleep(0.5)

print("\n 使用 flush()强制刷新:")
with open(log_file, "w", encoding="utf-8") as f:
    for i in range(3):
        f.write(f"实时日志条目 {i+1}\n")
        f.flush()  # 强制刷新到磁盘
        print(f"实时写入条目 {i+1}")
        time.sleep(0.5)

## 陷阱 8: 异常处理不当
print("\n 陷阱 8: 异常处理")

## 错误方式:捕获过于宽泛的异常
try:
    with open("test_exception.txt", "r", encoding="utf-8") as f:
        content = f.read()
except Exception as e:  # 过于宽泛
    print(f"发生了某种错误: {e}")

## 正确方式:捕获具体的异常
try:
    with open("test_exception.txt", "r", encoding="utf-8") as f:
        content = f.read()
except FileNotFoundError:
    print("文件不存在,创建新文件")
    with open("test_exception.txt", "w", encoding="utf-8") as f:
        f.write("新创建的文件内容")
except PermissionError:
    print("没有权限访问文件")
except UnicodeDecodeError:
    print("文件编码问题")
except OSError as e:
    print(f"操作系统相关错误: {e}")

## 清理测试文件
test_files = [
    "sample.txt", "chinese_test.txt", "mode_test.txt", 
    "test_file.txt", "test_file_abs.txt", "large_file.txt",
    "maybe_exists.txt", "realtime.log", "test_exception.txt"
]

for filename in test_files:
    try:
        os.remove(filename)
    except FileNotFoundError:
        pass

print("\n 已清理所有测试文件")

🔧 性能优化

文件操作性能对比

## 文件操作性能对比
print("文件操作性能对比:")
import time

## 创建测试数据
test_lines = [f"这是第{i}行测试数据\n" for i in range(10000)]
test_content = "".join(test_lines)

print(f"测试数据: {len(test_lines)} 行, {len(test_content)} 字符")

## 性能测试 1: 不同写入方式
print("\n1. 写入性能对比:")

## 方法 1: 逐行写入
start_time = time.time()
with open("perf_test1.txt", "w", encoding="utf-8") as f:
    for line in test_lines:
        f.write(line)
time1 = time.time() - start_time

## 方法 2: 批量写入
start_time = time.time()
with open("perf_test2.txt", "w", encoding="utf-8") as f:
    f.write(test_content)
time2 = time.time() - start_time

## 方法 3: writelines
start_time = time.time()
with open("perf_test3.txt", "w", encoding="utf-8") as f:
    f.writelines(test_lines)
time3 = time.time() - start_time

print(f"逐行写入: {time1:.6f}秒")
print(f"批量写入: {time2:.6f}秒")
print(f"writelines: {time3:.6f}秒")
print(f"最快方法: {'批量写入' if time2 < min(time1, time3) else 'writelines' if time3 < time1 else '逐行写入'}")

## 性能测试 2: 不同读取方式
print("\n2. 读取性能对比:")

## 方法 1: read()全部读取
start_time = time.time()
with open("perf_test1.txt", "r", encoding="utf-8") as f:
    content = f.read()
time1 = time.time() - start_time

## 方法 2: readlines()
start_time = time.time()
with open("perf_test1.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
time2 = time.time() - start_time

## 方法 3: 逐行迭代
start_time = time.time()
lines_count = 0
with open("perf_test1.txt", "r", encoding="utf-8") as f:
    for line in f:
        lines_count += 1
time3 = time.time() - start_time

print(f"read()全部: {time1:.6f}秒")
print(f"readlines(): {time2:.6f}秒")
print(f"逐行迭代: {time3:.6f}秒 (处理了{lines_count}行)")

## 性能测试 3: 缓冲区大小影响
print("\n3. 缓冲区大小影响:")

buffer_sizes = [1024, 8192, 65536]  # 1KB, 8KB, 64KB

for buffer_size in buffer_sizes:
    start_time = time.time()
    with open("perf_test1.txt", "r", encoding="utf-8", buffering=buffer_size) as f:
        content = f.read()
    elapsed = time.time() - start_time
    print(f"缓冲区 {buffer_size//1024}KB: {elapsed:.6f}秒")

## 清理性能测试文件
for i in range(1, 4):
    try:
        os.remove(f"perf_test{i}.txt")
    except FileNotFoundError:
        pass

print("\n 性能测试完成")

📚 相关函数和模块

相关内置函数

  • print() - 输出到文件或标准输出
  • input() - 从标准输入读取
  • iter() - 创建迭代器
  • next() - 获取迭代器下一个元素

相关标准库模块

  • os - 操作系统接口
  • os.path - 路径操作
  • pathlib - 面向对象的路径操作
  • io - 核心 I/O 工具
  • tempfile - 临时文件和目录
  • shutil - 高级文件操作
  • glob - 文件名模式匹配
  • csv - CSV 文件读写
  • json - JSON 数据处理
  • pickle - Python 对象序列化

🔗 扩展阅读


最后更新: 2024-01-15

作者: Python 文档工程师

版本: 1.0

讨论与反馈

欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。