Pandas 高级功能与数据处理

难度: 高级
pandas 高级功能 数据处理 索引 切片 数据验证

Pandas 高级功能与数据处理

📋 概述

Pandas提供了丰富的高级功能,包括复杂的索引操作、数据验证、格式化输出等。本文档深入介绍这些高级特性,帮助您更高效地处理复杂的数据分析任务。

🎯 学习目标

通过本文档的学习,您将能够:

  • 掌握复杂的数据索引和切片技巧
  • 熟练使用数据验证和清洗方法
  • 了解科学计数法控制和格式化输出
  • 掌握高级数据转换和处理技术
  • 理解性能优化的最佳实践

📚 前置知识

  • Pandas基础操作
  • Pandas数据读写
  • Python高级语法
  • NumPy基础

📖 详细内容

高级索引和切片

多级索引操作

import pandas as pd
import numpy as np

# 创建多级索引DataFrame
arrays = [
    ['A', 'A', 'B', 'B'],
    ['one', 'two', 'one', 'two']
]
index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])
df = pd.DataFrame(np.random.randn(4, 2), index=index, columns=['X', 'Y'])
print(df)

# 多级索引选择
print(df.loc['A'])  # 选择第一级索引
print(df.loc[('A', 'one')])  # 选择具体位置
print(df.loc[:, 'X'])  # 选择列

高级切片技巧

# 创建示例DataFrame
df = pd.DataFrame({
    'A': range(10),
    'B': range(10, 20),
    'C': range(20, 30),
    'D': range(30, 40)
}, index=list('abcdefghij'))

# 复杂切片操作
print(df.loc['b':'f', 'A':'C'])  # 行列同时切片
print(df.iloc[1:6, 0:3])  # 位置切片
print(df.loc[df['A'] > 5, ['B', 'D']])  # 条件切片

# 使用query方法
result = df.query('A > 5 and B < 18')
print(result)

ix方法的替代方案

# ix方法已被弃用,使用loc和iloc替代
df = pd.DataFrame(np.arange(42).reshape(7, 6), 
                  columns=list('abcdef'), 
                  index=list('hijklmn'))

# 原ix用法的替代方案
# df.ix['h':'k', 'a':'d'] 替代为:
result1 = df.loc['h':'k', 'a':'d']

# df.ix[1:3, 2:4] 替代为:
result2 = df.iloc[1:3, 2:4]

# 混合索引需要分步操作
result3 = df.loc['h':'k'].iloc[:, 2:4]
print(result1)
print(result2)
print(result3)

数据验证和清洗

空值检测和处理

# 创建包含缺失值的DataFrame
df = pd.DataFrame({
    'A': [1, 2, np.nan, 4, 5],
    'B': [np.nan, 2, 3, np.nan, 5],
    'C': [1, 2, 3, 4, 5],
    'D': [np.nan, np.nan, np.nan, np.nan, np.nan]
})

# 检测空值
print("是否为空:")
print(df.isnull())
print("\n非空值:")
print(df.notnull())

# 统计空值
print("\n每列空值数量:")
print(df.isnull().sum())

# 检测全为空的列
empty_cols = df.columns[df.isnull().all()]
print(f"\n全为空的列: {empty_cols.tolist()}")

# 检测包含空值的列
has_null_cols = df.columns[df.isnull().any()]
print(f"包含空值的列: {has_null_cols.tolist()}")

使用all()和any()进行数据验证

# 创建测试数据
df = pd.DataFrame({
    'score': [85, 92, 0, 96, 88, np.nan],
    'grade': ['A', 'A', 'F', 'A', 'B', 'C'],
    'status': [1, 1, 0, 1, 1, 1]
})

# 查找包含0的列
zero_cols = df.columns[(df == 0).any()]
print(f"包含0的列: {zero_cols.tolist()}")

# 查找全为0的列
all_zero_cols = df.columns[(df == 0).all()]
print(f"全为0的列: {all_zero_cols.tolist()}")

# 查找包含缺失值的列
null_cols = df.columns[df.isnull().any()]
print(f"包含缺失值的列: {null_cols.tolist()}")

# 数据清洗
clean_df = df.dropna()  # 删除包含缺失值的行
print("\n删除缺失值后:")
print(clean_df)

# 填充缺失值
filled_df = df.fillna(df.mean(numeric_only=True))
print("\n填充缺失值后:")
print(filled_df)

格式化和显示控制

科学计数法控制

# 创建包含大数值的DataFrame
df = pd.DataFrame({
    'small': [0.000001, 0.000002, 0.000003],
    'large': [1000000, 2000000, 3000000],
    'normal': [1.23, 4.56, 7.89]
})

print("默认显示:")
print(df)

# 取消科学计数法
pd.set_option('display.float_format', lambda x: '%.6f' % x)
print("\n取消科学计数法:")
print(df)

# 恢复默认设置
pd.reset_option('display.float_format')

# 使用格式化函数
formatted_df = df.applymap(lambda x: f'{x:.2f}' if isinstance(x, float) else x)
print("\n格式化后:")
print(formatted_df)

显示选项设置

# 设置显示选项
pd.set_option('display.max_rows', 100)  # 最大显示行数
pd.set_option('display.max_columns', 20)  # 最大显示列数
pd.set_option('display.width', 1000)  # 显示宽度
pd.set_option('display.precision', 3)  # 浮点数精度

# 查看当前设置
print("当前显示设置:")
print(pd.describe_option('display'))

# 重置所有选项
pd.reset_option('all')

DataFrame与字典转换

多种转换格式

# 创建示例DataFrame
students = pd.DataFrame({
    '姓名': ['张三', '李四', '王五', '赵六'],
    '性别': ['男', '女', '男', '女'],
    '年龄': [20, 21, 19, 18],
    'Python成绩': [70, 80, 90, 50]
})

# 1. 默认格式 {column: {index: value}}
dict_default = students.to_dict()
print("默认格式:")
print(dict_default)

# 2. 列表格式 {column: [values]}
dict_list = students.to_dict(orient='list')
print("\n列表格式:")
print(dict_list)

# 3. 记录格式 [{column: value}, ...]
dict_records = students.to_dict(orient='records')
print("\n记录格式:")
print(dict_records)

# 4. 索引格式 {index: {column: value}}
dict_index = students.to_dict(orient='index')
print("\n索引格式:")
print(dict_index)

# 5. 分割格式 {'index': [...], 'columns': [...], 'data': [...]}
dict_split = students.to_dict(orient='split')
print("\n分割格式:")
print(dict_split)

# 6. Series格式 {column: Series}
dict_series = students.to_dict(orient='series')
print("\nSeries格式:")
print(dict_series)

高级数据处理技巧

条件选择和过滤

# 创建示例数据
df = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 28, 32],
    'salary': [50000, 60000, 70000, 55000, 65000],
    'department': ['IT', 'HR', 'IT', 'Finance', 'IT']
})

# 复杂条件筛选
it_high_salary = df[
    (df['department'] == 'IT') & 
    (df['salary'] > 55000) & 
    (df['age'] < 35)
]
print("IT部门高薪年轻员工:")
print(it_high_salary)

# 使用isin进行筛选
selected_names = df[df['name'].isin(['Alice', 'Charlie', 'Eve'])]
print("\n指定员工信息:")
print(selected_names)

# 使用where方法
conditional_df = df.where(df['age'] > 30, '年轻员工')
print("\n条件替换:")
print(conditional_df)

数据类型转换和验证

# 创建混合类型数据
df = pd.DataFrame({
    'int_col': ['1', '2', '3', '4'],
    'float_col': ['1.1', '2.2', '3.3', '4.4'],
    'bool_col': ['True', 'False', 'True', 'False'],
    'date_col': ['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04']
})

print("原始数据类型:")
print(df.dtypes)

# 批量类型转换
type_mapping = {
    'int_col': 'int64',
    'float_col': 'float64',
    'bool_col': 'bool',
    'date_col': 'datetime64[ns]'
}

df_converted = df.astype(type_mapping)
print("\n转换后数据类型:")
print(df_converted.dtypes)

# 安全的类型转换
def safe_convert(series, target_type):
    """安全的数据类型转换"""
    try:
        return series.astype(target_type)
    except (ValueError, TypeError) as e:
        print(f"转换失败: {e}")
        return series

# 使用pd.to_numeric进行数值转换
numeric_col = pd.to_numeric(df['int_col'], errors='coerce')
print("\n数值转换结果:")
print(numeric_col)

💡 实际应用

基础用法示例

# 数据质量检查工具
def data_quality_report(df):
    """生成数据质量报告"""
    report = {
        '总行数': len(df),
        '总列数': len(df.columns),
        '缺失值统计': df.isnull().sum().to_dict(),
        '数据类型': df.dtypes.to_dict(),
        '重复行数': df.duplicated().sum(),
        '数值列统计': df.describe().to_dict()
    }
    return report

# 使用示例
sample_data = pd.DataFrame({
    'A': [1, 2, np.nan, 4, 2],
    'B': ['x', 'y', 'z', 'x', 'y'],
    'C': [1.1, 2.2, 3.3, 4.4, 5.5]
})

quality_report = data_quality_report(sample_data)
for key, value in quality_report.items():
    print(f"{key}: {value}")

高级用法示例

# 复杂数据处理流水线
class DataProcessor:
    """数据处理流水线"""
    
    def __init__(self, df):
        self.df = df.copy()
        self.processing_log = []
    
    def remove_duplicates(self):
        """删除重复行"""
        before_count = len(self.df)
        self.df = self.df.drop_duplicates()
        after_count = len(self.df)
        self.processing_log.append(f"删除重复行: {before_count - after_count}行")
        return self
    
    def handle_missing_values(self, strategy='drop'):
        """处理缺失值"""
        if strategy == 'drop':
            before_count = len(self.df)
            self.df = self.df.dropna()
            after_count = len(self.df)
            self.processing_log.append(f"删除缺失值: {before_count - after_count}行")
        elif strategy == 'fill':
            numeric_cols = self.df.select_dtypes(include=[np.number]).columns
            self.df[numeric_cols] = self.df[numeric_cols].fillna(self.df[numeric_cols].mean())
            self.processing_log.append("数值列用均值填充缺失值")
        return self
    
    def normalize_columns(self, columns):
        """标准化指定列"""
        for col in columns:
            if col in self.df.columns:
                self.df[col] = (self.df[col] - self.df[col].mean()) / self.df[col].std()
                self.processing_log.append(f"标准化列: {col}")
        return self
    
    def filter_outliers(self, column, method='iqr'):
        """过滤异常值"""
        if method == 'iqr':
            Q1 = self.df[column].quantile(0.25)
            Q3 = self.df[column].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            before_count = len(self.df)
            self.df = self.df[
                (self.df[column] >= lower_bound) & 
                (self.df[column] <= upper_bound)
            ]
            after_count = len(self.df)
            self.processing_log.append(f"过滤{column}列异常值: {before_count - after_count}行")
        return self
    
    def get_result(self):
        """获取处理结果"""
        return self.df, self.processing_log

# 使用示例
raw_data = pd.DataFrame({
    'score': [85, 92, 78, 96, 88, 85, 92, 1000, np.nan, 75],  # 包含异常值和缺失值
    'age': [20, 21, 19, 22, 20, 21, 19, 25, 18, np.nan],
    'category': ['A', 'B', 'A', 'C', 'B', 'A', 'B', 'C', 'A', 'B']
})

# 处理流水线
processor = DataProcessor(raw_data)
processed_df, log = (processor
                    .remove_duplicates()
                    .handle_missing_values('drop')
                    .filter_outliers('score')
                    .normalize_columns(['age'])
                    .get_result())

print("处理日志:")
for entry in log:
    print(f"- {entry}")

print("\n处理后数据:")
print(processed_df)

⚠️ 注意事项

  1. 索引对齐:复杂操作时注意索引的对齐和重置
  2. 内存效率:大数据集时考虑使用视图而非副本
  3. 数据类型:转换数据类型前先验证数据的有效性
  4. 链式操作:避免过长的方法链,影响调试和维护
  5. 异常处理:数据处理时要考虑异常情况的处理
  6. 性能优化:对于重复操作,考虑向量化实现

🔗 相关内容

📚 扩展阅读

🏷️ 标签

pandas 高级功能 数据处理 索引 切片 数据验证 格式化 性能优化

作者: Python技术文档工程师

版本: 1.0.0

讨论与反馈

欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。