exec() - 代码执行函数
执行字符串形式的 Python 代码
exec() - 代码执行函数
📝 概述
exec() 是 Python 中的内置函数,用于执行字符串形式的 Python 代码。与 eval() 不同,exec() 可以执行完整的 Python 语句,包括赋值、循环、函数定义、类定义等。这个函数提供了强大的动态代码执行能力,但同时也带来了严重的安全风险。
⚠️ 安全警告
exec()函数存在极高的安全风险!
- 永远不要对不可信的输入使用 exec()
- 恶意代码可能完全控制系统
- 在生产环境中使用时必须进行严格的安全控制
- 考虑使用更安全的替代方案
🎯 学习目标
- 掌握 exec()函数的基本用法和语法
- 理解 exec()与 eval()的区别
- 学会安全地使用 exec()函数
- 了解 exec()的应用场景和限制
- 掌握动态代码生成和执行技术
📋 前置知识
- Python 基本语法
- 表达式和语句的区别
- 作用域和命名空间的概念
- 异常处理的基本知识
- 安全编程的基本概念
- 面向对象编程基础
🔍 详细内容
基本概念
exec() 函数接受一个字符串参数,将其作为 Python 代码进行执行。它可以执行任何有效的 Python 语句,包括:
- 赋值语句
- 控制流语句(if、for、while 等)
- 函数定义
- 类定义
- 导入语句
- 其他复杂的 Python 代码
语法格式
exec(object, globals=None, locals=None)
参数说明
| 参数名 | 类型 | 必需 | 默认值 | 说明 |
|---|---|---|---|---|
| object | 字符串或代码对象 | 是 | 无 | 要执行的 Python 代码 |
| globals | 字典 | 否 | None | 全局命名空间 |
| locals | 字典 | 否 | None | 局部命名空间 |
返回值
- 类型: None
- 说明: exec()总是返回 None,但执行的代码可能会修改命名空间
💡 代码示例
基本用法
## 基本语句执行
print("基本语句执行:")
## 简单赋值
exec("x = 10")
print(f"x = {x}") # 10
## 多行代码
code = """
y = 20
z = x + y
print(f'x + y = {z}')
"""
exec(code)
## 控制流语句
loop_code = """
for i in range(3):
print(f'循环 {i}')
"""
print("\n 控制流执行:")
exec(loop_code)
## 条件语句
condition_code = """
if x > 5:
print('x 大于 5')
else:
print('x 不大于 5')
"""
print("\n 条件语句执行:")
exec(condition_code)
## 函数定义和调用
function_code = """
def greet(name):
return f'Hello, {name}!'
message = greet('Python')
print(message)
"""
print("\n 函数定义执行:")
exec(function_code)
## 类定义
class_code = """
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def introduce(self):
return f'我是{self.name},{self.age}岁'
person = Person('张三', 25)
print(person.introduce())
"""
print("\n 类定义执行:")
exec(class_code)
使用自定义命名空间
## 自定义命名空间示例
print("\n 自定义命名空间示例:")
## 创建隔离的全局命名空间
custom_globals = {
'__builtins__': {
'print': print,
'len': len,
'range': range,
'str': str,
'int': int,
'float': float
},
'math_pi': 3.14159,
'math_e': 2.71828
}
## 创建局部命名空间
custom_locals = {}
## 在自定义命名空间中执行代码
code_with_namespace = """
## 定义一些变量
radius = 5
area = math_pi * radius ** 2
## 定义函数
def calculate_circle_area(r):
return math_pi * r ** 2
def calculate_volume(r, h):
base_area = calculate_circle_area(r)
return base_area * h
## 计算结果
results = {
'area': area,
'volume': calculate_volume(3, 10)
}
print(f'圆面积: {area}')
print(f'圆柱体积: {results["volume"]}')
"""
print("在自定义命名空间中执行:")
exec(code_with_namespace, custom_globals, custom_locals)
## 查看执行后的局部变量
print(f"\n 执行后的局部变量:")
for key, value in custom_locals.items():
if not key.startswith('__'):
print(f" {key}: {value}")
## 尝试访问被限制的功能
print(f"\n 安全性测试:")
try:
restricted_code = """
import os
os.system('echo "这不应该执行"')
"""
exec(restricted_code, custom_globals, {})
except Exception as e:
print(f"访问受限功能失败(预期): {e}")
## 动态变量访问
print(f"\n 动态变量访问:")
## 从局部命名空间获取函数
if 'calculate_circle_area' in custom_locals:
calc_func = custom_locals['calculate_circle_area']
result = calc_func(7)
print(f"动态调用函数结果: {result}")
动态代码生成
## 动态代码生成示例
print("\n 动态代码生成示例:")
## 代码生成器类
class CodeGenerator:
"""动态代码生成器"""
def __init__(self):
self.indent_level = 0
self.code_lines = []
def add_line(self, line):
"""添加代码行"""
indent = ' ' * self.indent_level
self.code_lines.append(indent + line)
def indent(self):
"""增加缩进"""
self.indent_level += 1
def dedent(self):
"""减少缩进"""
if self.indent_level > 0:
self.indent_level -= 1
def get_code(self):
"""获取生成的代码"""
return '\n'.join(self.code_lines)
def clear(self):
"""清空代码"""
self.code_lines = []
self.indent_level = 0
## 生成数学函数
def generate_math_function(func_name, operations):
"""生成数学函数"""
generator = CodeGenerator()
# # 函数定义
generator.add_line(f"def {func_name}(x):")
generator.indent()
# # 添加操作
for i, op in enumerate(operations):
if i == 0:
generator.add_line(f"result = x {op}")
else:
generator.add_line(f"result = result {op}")
# # 返回结果
generator.add_line("return result")
return generator.get_code()
## 生成并执行数学函数
math_operations = ['+ 10', '* 2', '- 5']
math_code = generate_math_function('custom_math', math_operations)
print("生成的数学函数代码:")
print(math_code)
print("\n 执行生成的函数:")
exec(math_code)
## 测试生成的函数
test_values = [1, 5, 10, 15]
for val in test_values:
result = custom_math(val)
print(f"custom_math({val}) = {result}")
## 生成类定义
def generate_data_class(class_name, fields):
"""生成数据类"""
generator = CodeGenerator()
# # 类定义
generator.add_line(f"class {class_name}:")
generator.indent()
# # __init__ 方法
params = ', '.join(fields)
generator.add_line(f"def __init__(self, {params}):")
generator.indent()
for field in fields:
generator.add_line(f"self.{field} = {field}")
generator.dedent()
# # __str__ 方法
generator.add_line("def __str__(self):")
generator.indent()
field_strs = [f"'{field}: {{{field}}}'"replace('{field}', f'self.{field}') for field in fields]
fields_format = ' + ", " + '.join(field_strs)
generator.add_line(f"return f'{class_name}(' + {fields_format} + ')'")
generator.dedent()
# # __repr__ 方法
generator.add_line("def __repr__(self):")
generator.indent()
generator.add_line("return self.__str__()")
return generator.get_code()
## 生成并执行数据类
data_fields = ['name', 'age', 'city']
data_class_code = generate_data_class('Person', data_fields)
print(f"\n 生成的数据类代码:")
print(data_class_code)
print(f"\n 执行生成的类:")
exec(data_class_code)
## 测试生成的类
person1 = Person('Alice', 30, 'Beijing')
person2 = Person('Bob', 25, 'Shanghai')
print(f"person1: {person1}")
print(f"person2: {person2}")
配置驱动的代码执行
import json
from typing import Dict, Any, List
## 配置驱动的代码执行系统
class ConfigDrivenExecutor:
"""配置驱动的代码执行器"""
def __init__(self):
self.safe_builtins = {
'print': print,
'len': len,
'range': range,
'enumerate': enumerate,
'zip': zip,
'map': map,
'filter': filter,
'sum': sum,
'max': max,
'min': min,
'abs': abs,
'round': round,
'int': int,
'float': float,
'str': str,
'bool': bool,
'list': list,
'tuple': tuple,
'dict': dict,
'set': set
}
self.execution_context = {
'__builtins__': self.safe_builtins,
'data': {},
'results': {},
'config': {}
}
def load_config(self, config_data: Dict[str, Any]):
"""加载配置"""
self.execution_context['config'] = config_data
def set_data(self, data: Dict[str, Any]):
"""设置数据"""
self.execution_context['data'] = data
def execute_script(self, script: str) -> Dict[str, Any]:
"""执行脚本"""
try:
# # 创建独立的局部命名空间
local_context = {}
# # 执行脚本
exec(script, self.execution_context, local_context)
# # 更新结果
if 'results' in local_context:
self.execution_context['results'].update(local_context['results'])
return local_context
except Exception as e:
raise RuntimeError(f"脚本执行失败: {e}")
def execute_config_scripts(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""执行配置中的脚本"""
self.load_config(config)
results = {}
# # 执行初始化脚本
if 'init_script' in config:
print("执行初始化脚本...")
init_result = self.execute_script(config['init_script'])
results['init'] = init_result
# # 执行数据处理脚本
if 'data_scripts' in config:
results['data_processing'] = []
for i, script in enumerate(config['data_scripts']):
print(f"执行数据处理脚本 {i+1}...")
script_result = self.execute_script(script)
results['data_processing'].append(script_result)
# # 执行清理脚本
if 'cleanup_script' in config:
print("执行清理脚本...")
cleanup_result = self.execute_script(config['cleanup_script'])
results['cleanup'] = cleanup_result
return results
## 测试配置驱动执行
print("\n 配置驱动执行示例:")
executor = ConfigDrivenExecutor()
## 设置测试数据
test_data = {
'numbers': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'names': ['Alice', 'Bob', 'Charlie', 'David'],
'scores': [85, 92, 78, 96]
}
executor.set_data(test_data)
## 配置脚本
config = {
'init_script': '''
## 初始化变量
total_count = 0
processed_items = []
statistics = {}
print("初始化完成")
''',
'data_scripts': [
'''
## 处理数字数据
numbers = data['numbers']
even_numbers = [n for n in numbers if n % 2 == 0]
odd_numbers = [n for n in numbers if n % 2 == 1]
statistics['even_count'] = len(even_numbers)
statistics['odd_count'] = len(odd_numbers)
statistics['total_sum'] = sum(numbers)
statistics['average'] = sum(numbers) / len(numbers)
print(f"处理了 {len(numbers)} 个数字")
print(f"偶数: {even_numbers}")
print(f"奇数: {odd_numbers}")
''',
'''
## 处理名字和分数数据
names = data['names']
scores = data['scores']
## 创建学生记录
students = []
for name, score in zip(names, scores):
grade = 'A' if score >= 90 else 'B' if score >= 80 else 'C'
students.append({
'name': name,
'score': score,
'grade': grade
})
## 统计成绩
grade_counts = {'A': 0, 'B': 0, 'C': 0}
for student in students:
grade_counts[student['grade']] += 1
statistics['students'] = students
statistics['grade_distribution'] = grade_counts
statistics['highest_score'] = max(scores)
statistics['lowest_score'] = min(scores)
print(f"处理了 {len(students)} 个学生记录")
for student in students:
print(f" {student['name']}: {student['score']} ({student['grade']})")
'''
],
'cleanup_script': '''
## 生成最终报告
report = {
'summary': {
'total_numbers': len(data['numbers']),
'total_students': len(data['names']),
'processing_complete': True
},
'statistics': statistics
}
results['final_report'] = report
print("\n=== 最终报告 ===")
print(f"处理数字总数: {report['summary']['total_numbers']}")
print(f"学生总数: {report['summary']['total_students']}")
print(f"数字总和: {statistics['total_sum']}")
print(f"平均值: {statistics['average']:.2f}")
print(f"最高分: {statistics['highest_score']}")
print(f"最低分: {statistics['lowest_score']}")
print(f"成绩分布: {statistics['grade_distribution']}")
print("处理完成!")
'''
}
## 执行配置脚本
print("开始执行配置脚本:")
print("=" * 50)
try:
execution_results = executor.execute_config_scripts(config)
print("\n=== 执行结果 ===")
if 'final_report' in executor.execution_context['results']:
final_report = executor.execution_context['results']['final_report']
print("最终报告已生成")
print(f"统计信息: {len(final_report['statistics'])} 项")
except Exception as e:
print(f"执行失败: {e}")
🚀 高级应用
插件系统
import os
import importlib.util
from typing import Dict, Any, List, Callable
from abc import ABC, abstractmethod
## 插件系统实现
class Plugin(ABC):
"""插件基类"""
@abstractmethod
def get_name(self) -> str:
"""获取插件名称"""
pass
@abstractmethod
def get_version(self) -> str:
"""获取插件版本"""
pass
@abstractmethod
def execute(self, context: Dict[str, Any]) -> Any:
"""执行插件"""
pass
class PluginManager:
"""插件管理器"""
def __init__(self):
self.plugins: Dict[str, Plugin] = {}
self.plugin_code_cache: Dict[str, str] = {}
# # 安全的执行环境
self.safe_globals = {
'__builtins__': {
'print': print,
'len': len,
'range': range,
'enumerate': enumerate,
'zip': zip,
'sum': sum,
'max': max,
'min': min,
'abs': abs,
'round': round,
'int': int,
'float': float,
'str': str,
'bool': bool,
'list': list,
'tuple': tuple,
'dict': dict,
'set': set
},
'Plugin': Plugin
}
def load_plugin_from_code(self, plugin_name: str, plugin_code: str) -> bool:
"""从代码加载插件"""
try:
# # 创建插件命名空间
plugin_namespace = {}
# # 执行插件代码
exec(plugin_code, self.safe_globals, plugin_namespace)
# # 查找插件类
plugin_class = None
for name, obj in plugin_namespace.items():
if (isinstance(obj, type) and
issubclass(obj, Plugin) and
obj != Plugin):
plugin_class = obj
break
if plugin_class is None:
raise ValueError("未找到有效的插件类")
# # 创建插件实例
plugin_instance = plugin_class()
# # 注册插件
self.plugins[plugin_name] = plugin_instance
self.plugin_code_cache[plugin_name] = plugin_code
print(f"插件 '{plugin_name}' 加载成功")
print(f" 名称: {plugin_instance.get_name()}")
print(f" 版本: {plugin_instance.get_version()}")
return True
except Exception as e:
print(f"加载插件 '{plugin_name}' 失败: {e}")
return False
def execute_plugin(self, plugin_name: str, context: Dict[str, Any] = None) -> Any:
"""执行插件"""
if plugin_name not in self.plugins:
raise ValueError(f"插件 '{plugin_name}' 未找到")
plugin = self.plugins[plugin_name]
context = context or {}
try:
return plugin.execute(context)
except Exception as e:
raise RuntimeError(f"执行插件 '{plugin_name}' 失败: {e}")
def list_plugins(self) -> List[Dict[str, str]]:
"""列出所有插件"""
plugin_list = []
for name, plugin in self.plugins.items():
plugin_list.append({
'name': name,
'plugin_name': plugin.get_name(),
'version': plugin.get_version()
})
return plugin_list
def unload_plugin(self, plugin_name: str) -> bool:
"""卸载插件"""
if plugin_name in self.plugins:
del self.plugins[plugin_name]
if plugin_name in self.plugin_code_cache:
del self.plugin_code_cache[plugin_name]
print(f"插件 '{plugin_name}' 已卸载")
return True
return False
## 测试插件系统
print("\n 插件系统示例:")
plugin_manager = PluginManager()
## 数学计算插件
math_plugin_code = '''
class MathPlugin(Plugin):
"""数学计算插件"""
def get_name(self) -> str:
return "数学计算器"
def get_version(self) -> str:
return "1.0.0"
def execute(self, context: dict) -> dict:
numbers = context.get('numbers', [])
if not numbers:
return {'error': '没有提供数字'}
results = {
'count': len(numbers),
'sum': sum(numbers),
'average': sum(numbers) / len(numbers),
'min': min(numbers),
'max': max(numbers),
'even_count': len([n for n in numbers if n % 2 == 0]),
'odd_count': len([n for n in numbers if n % 2 == 1])
}
return results
'''
## 文本处理插件
text_plugin_code = '''
class TextPlugin(Plugin):
"""文本处理插件"""
def get_name(self) -> str:
return "文本处理器"
def get_version(self) -> str:
return "1.1.0"
def execute(self, context: dict) -> dict:
text = context.get('text', '')
if not text:
return {'error': '没有提供文本'}
words = text.split()
results = {
'original_text': text,
'character_count': len(text),
'word_count': len(words),
'line_count': text.count('\\n') + 1,
'uppercase': text.upper(),
'lowercase': text.lower(),
'title_case': text.title(),
'word_frequency': {}
}
# # 计算词频
for word in words:
clean_word = word.lower().strip('.,!?;:')
if clean_word:
results['word_frequency'][clean_word] = results['word_frequency'].get(clean_word, 0) + 1
return results
'''
## 数据分析插件
data_plugin_code = '''
class DataAnalysisPlugin(Plugin):
"""数据分析插件"""
def get_name(self) -> str:
return "数据分析器"
def get_version(self) -> str:
return "2.0.0"
def execute(self, context: dict) -> dict:
data = context.get('data', [])
if not data:
return {'error': '没有提供数据'}
# # 分析不同类型的数据
analysis = {
'total_items': len(data),
'data_types': {},
'numeric_analysis': {},
'string_analysis': {},
'list_analysis': {}
}
# # 统计数据类型
for item in data:
item_type = type(item).__name__
analysis['data_types'][item_type] = analysis['data_types'].get(item_type, 0) + 1
# # 数值分析
numeric_data = [item for item in data if isinstance(item, (int, float))]
if numeric_data:
analysis['numeric_analysis'] = {
'count': len(numeric_data),
'sum': sum(numeric_data),
'average': sum(numeric_data) / len(numeric_data),
'min': min(numeric_data),
'max': max(numeric_data)
}
# # 字符串分析
string_data = [item for item in data if isinstance(item, str)]
if string_data:
analysis['string_analysis'] = {
'count': len(string_data),
'total_length': sum(len(s) for s in string_data),
'average_length': sum(len(s) for s in string_data) / len(string_data),
'longest': max(string_data, key=len),
'shortest': min(string_data, key=len)
}
# # 列表分析
list_data = [item for item in data if isinstance(item, list)]
if list_data:
analysis['list_analysis'] = {
'count': len(list_data),
'total_elements': sum(len(lst) for lst in list_data),
'average_length': sum(len(lst) for lst in list_data) / len(list_data)
}
return analysis
'''
## 加载插件
print("加载插件:")
plugin_manager.load_plugin_from_code('math', math_plugin_code)
plugin_manager.load_plugin_from_code('text', text_plugin_code)
plugin_manager.load_plugin_from_code('data', data_plugin_code)
## 列出插件
print(f"\n 已加载的插件:")
for plugin_info in plugin_manager.list_plugins():
print(f" {plugin_info['name']}: {plugin_info['plugin_name']} v{plugin_info['version']}")
## 测试数学插件
print(f"\n 测试数学插件:")
math_context = {'numbers': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}
math_result = plugin_manager.execute_plugin('math', math_context)
print(f"数学计算结果: {math_result}")
## 测试文本插件
print(f"\n 测试文本插件:")
text_context = {'text': 'Hello World! This is a test text. Hello again!'}
text_result = plugin_manager.execute_plugin('text', text_context)
print(f"文本处理结果:")
for key, value in text_result.items():
if key != 'word_frequency':
print(f" {key}: {value}")
print(f" 词频统计: {text_result['word_frequency']}")
## 测试数据分析插件
print(f"\n 测试数据分析插件:")
data_context = {
'data': [1, 2, 3, 'hello', 'world', [1, 2, 3], [4, 5], 4.5, 'test', [6, 7, 8, 9]]
}
data_result = plugin_manager.execute_plugin('data', data_context)
print(f"数据分析结果:")
for key, value in data_result.items():
print(f" {key}: {value}")
模板引擎
import re
from typing import Dict, Any, List, Optional
## 简单的模板引擎
class TemplateEngine:
"""简单的模板引擎"""
def __init__(self):
# # 模板语法模式
self.variable_pattern = re.compile(r'\{\{\s*([^}]+)\s*\}\}')
self.code_block_pattern = re.compile(r'\{%\s*([^%]+)\s*%\}', re.DOTALL)
self.comment_pattern = re.compile(r'\{#\s*([^#]*)\s*#\}')
# # 安全的执行环境
self.safe_builtins = {
'len': len,
'range': range,
'enumerate': enumerate,
'zip': zip,
'sum': sum,
'max': max,
'min': min,
'abs': abs,
'round': round,
'int': int,
'float': float,
'str': str,
'bool': bool,
'list': list,
'tuple': tuple,
'dict': dict,
'set': set,
'sorted': sorted,
'reversed': reversed
}
def render_template(self, template: str, context: Dict[str, Any]) -> str:
"""渲染模板"""
# # 移除注释
template = self.comment_pattern.sub('', template)
# # 处理代码块
template = self._process_code_blocks(template, context)
# # 处理变量
template = self._process_variables(template, context)
return template
def _process_variables(self, template: str, context: Dict[str, Any]) -> str:
"""处理变量替换"""
def replace_variable(match):
var_expr = match.group(1).strip()
try:
# # 创建安全的执行环境
safe_context = {
'__builtins__': self.safe_builtins,
**context
}
# # 求值变量表达式
result = eval(var_expr, safe_context, {})
return str(result)
except Exception as e:
return f"{{{{ ERROR: {e} }}}}"
return self.variable_pattern.sub(replace_variable, template)
def _process_code_blocks(self, template: str, context: Dict[str, Any]) -> str:
"""处理代码块"""
def replace_code_block(match):
code = match.group(1).strip()
try:
# # 创建输出缓冲区
output_buffer = []
# # 创建安全的执行环境
safe_context = {
'__builtins__': self.safe_builtins,
'output': output_buffer,
'print_to_template': lambda *args: output_buffer.append(' '.join(str(arg) for arg in args)),
**context
}
# # 执行代码块
exec(code, safe_context, {})
# # 返回输出
return '\n'.join(output_buffer)
except Exception as e:
return f"{% ERROR: {e} %}"
return self.code_block_pattern.sub(replace_code_block, template)
def render_file_template(self, template_path: str, context: Dict[str, Any]) -> str:
"""渲染文件模板"""
try:
with open(template_path, 'r', encoding='utf-8') as f:
template = f.read()
return self.render_template(template, context)
except Exception as e:
raise RuntimeError(f"渲染模板文件失败: {e}")
## 测试模板引擎
print("\n 模板引擎示例:")
template_engine = TemplateEngine()
## 简单变量模板
simple_template = """
欢迎 {{ name }}!
您的年龄是 {{ age }} 岁。
您的分数是 {{ score }},等级是 {{ 'A' if score >= 90 else 'B' if score >= 80 else 'C' }}。
"""
simple_context = {
'name': '张三',
'age': 25,
'score': 95
}
print("简单变量模板:")
print(template_engine.render_template(simple_template, simple_context))
## 循环模板
loop_template = """
学生列表:
{%
for i, student in enumerate(students):
grade = 'A' if student['score'] >= 90 else 'B' if student['score'] >= 80 else 'C'
print_to_template(f"{i+1}. {student['name']}: {student['score']} ({grade})")
%}
统计信息:
{% raw %}
- 总人数: {{ len(students) }}
- 平均分: {{ sum(s['score'] for s in students) / len(students) }}
- 最高分: {{ max(s['score'] for s in students) }}
- 最低分: {{ min(s['score'] for s in students) }}
"""
loop_context = {
'students': [
{'name': 'Alice', 'score': 95},
{'name': 'Bob', 'score': 87},
{'name': 'Charlie', 'score': 92},
{'name': 'David', 'score': 78}
]
}
print(f"\n 循环模板:")
print(template_engine.render_template(loop_template, loop_context))
## 条件模板
conditional_template = """
{# 这是注释,不会显示 #}
系统状态报告
==================
{%
if system_status == 'healthy':
print_to_template("✅ 系统运行正常")
print_to_template(f"CPU 使用率: {cpu_usage}%")
print_to_template(f"内存使用率: {memory_usage}%")
else:
print_to_template("❌ 系统异常")
print_to_template(f"错误信息: {error_message}")
%}
服务状态:
{%
for service, status in services.items():
icon = '🟢' if status == 'running' else '🔴'
print_to_template(f"{icon} {service}: {status}")
%}
{%
if alerts:
print_to_template("\n⚠️ 警告信息:")
for alert in alerts:
print_to_template(f"- {alert}")
else:
print_to_template("\n✅ 无警告信息")
%}
"""
conditional_context = {
'system_status': 'healthy',
'cpu_usage': 45,
'memory_usage': 67,
'services': {
'web_server': 'running',
'database': 'running',
'cache': 'stopped',
'queue': 'running'
},
'alerts': [
'缓存服务已停止',
'磁盘空间不足 (85% 已使用)'
]
}
print(f"\n 条件模板:")
print(template_engine.render_template(conditional_template, conditional_context))
## 复杂数据处理模板
complex_template = """
HTML 报告生成
=============
{%
## 数据预处理
total_sales = sum(item['amount'] for item in sales_data)
average_sale = total_sales / len(sales_data) if sales_data else 0
top_products = sorted(sales_data, key=lambda x: x['amount'], reverse=True)[:3]
## 按类别分组
category_sales = {}
for item in sales_data:
category = item['category']
if category not in category_sales:
category_sales[category] = []
category_sales[category].append(item)
## 生成报告
print_to_template(f"销售总额: ${total_sales:,.2f}")
print_to_template(f"平均销售额: ${average_sale:,.2f}")
print_to_template(f"交易数量: {len(sales_data)}")
print_to_template("")
print_to_template("前三名产品:")
for i, product in enumerate(top_products, 1):
print_to_template(f"{i}. {product['name']}: ${product['amount']:,.2f}")
print_to_template("")
print_to_template("按类别统计:")
for category, items in category_sales.items():
category_total = sum(item['amount'] for item in items)
print_to_template(f"{category}: ${category_total:,.2f} ({len(items)} 项)")
%}
"""
complex_context = {
'sales_data': [
{'name': '笔记本电脑', 'category': '电子产品', 'amount': 1299.99},
{'name': '手机', 'category': '电子产品', 'amount': 899.99},
{'name': '办公椅', 'category': '家具', 'amount': 299.99},
{'name': '键盘', 'category': '电子产品', 'amount': 79.99},
{'name': '书桌', 'category': '家具', 'amount': 399.99},
{'name': '显示器', 'category': '电子产品', 'amount': 249.99},
{'name': '台灯', 'category': '家具', 'amount': 49.99}
]
}
print(f"\n 复杂数据处理模板:")
print(template_engine.render_template(complex_template, complex_context))
⚠️ 常见陷阱与最佳实践
安全风险防护
## 安全风险防护示例
print("\n 安全风险防护示例:")
## 安全的 exec 包装器
class SecureExecutor:
"""安全的代码执行器"""
def __init__(self):
# # 危险关键词
self.dangerous_keywords = {
'import', 'exec', 'eval', 'compile', 'open', 'file',
'__import__', '__builtins__', '__globals__', '__locals__',
'globals', 'locals', 'vars', 'dir', 'hasattr', 'getattr',
'setattr', 'delattr', 'input', 'raw_input', 'reload'
}
# # 危险模块
self.dangerous_modules = {
'os', 'sys', 'subprocess', 'shutil', 'tempfile',
'pickle', 'marshal', 'shelve', 'dbm', 'sqlite3',
'socket', 'urllib', 'http', 'ftplib', 'smtplib'
}
# # 安全的内置函数
self.safe_builtins = {
'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict',
'enumerate', 'filter', 'float', 'hex', 'int', 'len',
'list', 'map', 'max', 'min', 'oct', 'ord', 'range',
'reversed', 'round', 'set', 'sorted', 'str', 'sum',
'tuple', 'zip'
}
def is_safe_code(self, code: str) -> tuple[bool, str]:
"""检查代码是否安全"""
# # 检查危险关键词
for keyword in self.dangerous_keywords:
if keyword in code:
return False, f"包含危险关键词: {keyword}"
# # 检查危险模块
for module in self.dangerous_modules:
if module in code:
return False, f"尝试访问危险模块: {module}"
# # 检查双下划线属性
if '__' in code:
return False, "包含双下划线属性访问"
# # 检查点号访问(可能的属性访问)
if '.' in code and any(dangerous in code for dangerous in ['class', 'base', 'subclass']):
return False, "可能的危险属性访问"
return True, "代码安全"
def create_safe_environment(self, allowed_vars: Dict[str, Any] = None) -> Dict[str, Any]:
"""创建安全的执行环境"""
# # 创建受限的内置函数字典
restricted_builtins = {}
for name in self.safe_builtins:
if hasattr(__builtins__, name):
restricted_builtins[name] = getattr(__builtins__, name)
# # 基础环境
safe_env = {
'__builtins__': restricted_builtins,
'__name__': '__restricted__',
'__doc__': None
}
# # 添加允许的变量
if allowed_vars:
safe_env.update(allowed_vars)
return safe_env
def safe_exec(self, code: str, allowed_vars: Dict[str, Any] = None) -> Dict[str, Any]:
"""安全地执行代码"""
# # 检查代码安全性
is_safe, message = self.is_safe_code(code)
if not is_safe:
raise SecurityError(f"不安全的代码: {message}")
# # 创建安全环境
safe_globals = self.create_safe_environment(allowed_vars)
safe_locals = {}
try:
# # 执行代码
exec(code, safe_globals, safe_locals)
return safe_locals
except Exception as e:
raise RuntimeError(f"代码执行失败: {e}")
## 自定义安全异常
class SecurityError(Exception):
"""安全异常"""
pass
## 测试安全执行器
print("安全执行器测试:")
secure_executor = SecureExecutor()
## 安全代码测试
safe_codes = [
"x = 10\ny = 20\nresult = x + y",
"numbers = [1, 2, 3, 4, 5]\ntotal = sum(numbers)",
"text = 'Hello World'\nlength = len(text)",
"data = [1, 2, 3]\nfiltered = list(filter(lambda x: x > 1, data))"
]
print("\n 安全代码测试:")
for i, code in enumerate(safe_codes, 1):
try:
result = secure_executor.safe_exec(code)
print(f" 测试 {i}: 成功")
print(f" 变量: {list(result.keys())}")
except Exception as e:
print(f" 测试 {i}: 失败 - {e}")
## 危险代码测试
dangerous_codes = [
"import os\nos.system('ls')",
"exec('print(\"hello\")')",
"open('/etc/passwd').read()",
"__import__('subprocess').call(['ls'])",
"[].append.__globals__['__builtins__']['eval']('1+1')",
"().__class__.__bases__[0].__subclasses__()"
]
print(f"\n 危险代码测试:")
for i, code in enumerate(dangerous_codes, 1):
try:
result = secure_executor.safe_exec(code)
print(f" 测试 {i}: 成功(不应该成功!)")
except SecurityError as e:
print(f" 测试 {i}: 被安全机制阻止 - {e}")
except Exception as e:
print(f" 测试 {i}: 其他错误 - {e}")
## 资源限制
print(f"\n 资源限制示例:")
import signal
import time
class ResourceLimitedExecutor(SecureExecutor):
"""资源受限的执行器"""
def __init__(self, timeout: int = 5, max_memory: int = 100*1024*1024): # 100MB
super().__init__()
self.timeout = timeout
self.max_memory = max_memory
def timeout_handler(self, signum, frame):
"""超时处理器"""
raise TimeoutError(f"代码执行超时 ({self.timeout}秒)")
def safe_exec_with_limits(self, code: str, allowed_vars: Dict[str, Any] = None) -> Dict[str, Any]:
"""带资源限制的安全执行"""
# # 设置超时
old_handler = signal.signal(signal.SIGALRM, self.timeout_handler)
signal.alarm(self.timeout)
try:
# # 记录开始时间
start_time = time.time()
# # 执行代码
result = self.safe_exec(code, allowed_vars)
# # 记录执行时间
execution_time = time.time() - start_time
result['__execution_time__'] = execution_time
return result
finally:
# # 恢复信号处理器
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
## 测试资源限制(在支持信号的系统上)
try:
limited_executor = ResourceLimitedExecutor(timeout=2)
# # 正常代码
normal_code = """
result = 0
for i in range(1000):
result += i
"""
print("正常代码执行:")
result = limited_executor.safe_exec_with_limits(normal_code)
print(f" 执行时间: {result.get('__execution_time__', 0):.4f}秒")
print(f" 结果: {result.get('result', 'N/A')}")
# # 可能超时的代码(注释掉以避免实际超时)
# # timeout_code = """
# # import time
# # time.sleep(10) # 这会超时
# # """
#
# # print("\n 超时代码测试:")
# # try:
# # result = limited_executor.safe_exec_with_limits(timeout_code)
# # except TimeoutError as e:
# # print(f" 超时被捕获: {e}")
except Exception as e:
print(f"资源限制测试跳过(可能不支持信号): {e}")
📚 相关函数和模块
内置函数
eval()- 执行 Python 表达式compile()- 编译 Python 代码globals()- 获取全局命名空间locals()- 获取局部命名空间vars()- 获取对象的属性字典dir()- 列出对象的属性hasattr()- 检查属性是否存在getattr()- 获取属性值setattr()- 设置属性值delattr()- 删除属性
标准库模块
ast- 抽象语法树code- 交互式解释器codeop- 编译 Python 代码dis- 字节码反汇编inspect- 对象检查types- 动态类型创建importlib- 导入机制runpy- 运行 Python 模块traceback- 异常跟踪
第三方库
RestrictedPython- 受限的 Python 执行asteval- 安全的表达式求值PyPy- Python 解释器Jinja2- 模板引擎Mako- 模板引擎
📖 扩展阅读
- Python 官方文档
- 安全相关
- Python 代码注入攻击防护
- 沙箱执行环境设计
- 安全编程最佳实践
- 高级应用
- 动态代码生成技术
- 插件系统设计
- 模板引擎实现
🏷️ 标签
代码执行 动态执行 语句执行 安全风险 插件系统 模板引擎 代码生成 沙箱执行 资源限制 安全编程
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。