1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy import stats
# ============================================
# 控制函数法的Python实现
# ============================================
# 读取数据
data = pd.read_csv("cereal.csv")
# ============================================
# 第一步:第一阶段回归
# ============================================
# 准备变量
X_stage1 = data[['cornprice', 'sugar', 'fiber']]
X_stage1 = sm.add_constant(X_stage1)
y_stage1 = np.log(data['price'])
# 运行第一阶段
stage1_model = sm.OLS(y_stage1, X_stage1).fit()
print("第一阶段回归结果:")
print(stage1_model.summary())
# 提取残差(控制函数)
v_hat = stage1_model.resid
data['v_hat'] = v_hat
# ============================================
# 第二步:第二阶段回归
# ============================================
X_stage2 = data[['log_price', 'sugar', 'fiber', 'v_hat']]
X_stage2 = sm.add_constant(X_stage2)
y_stage2 = np.log(data['sales'])
stage2_model = sm.OLS(y_stage2, X_stage2).fit()
print("\n第二阶段回归结果:")
print(stage2_model.summary())
# ============================================
# 方差修正(完整的方法)
# ============================================
def compute_cf_vcov(stage1, stage2, data):
"""
计算控制函数估计的修正方差矩阵
"""
n = len(data)
k = stage2.params.shape[0]
# 获取残差
residuals = stage2.resid
# 获取第二阶段的X矩阵
X2 = np.column_stack([np.ones(n),
np.log(data['price']),
data['sugar'],
data['fiber'],
data['v_hat']])
# 第一阶段的X矩阵
X1 = np.column_stack([np.ones(n),
np.log(data['cornprice']),
data['sugar'],
data['fiber']])
# 计算投影矩阵
P1 = X1 @ np.linalg.inv(X1.T @ X1) @ X1.T
# 调整后的X矩阵(考虑到v_hat的估计误差)
X2_adj = np.copy(X2)
X2_adj[:, -1] = (np.eye(n) - P1) @ data['v_hat'].values
# 计算修正的方差
XtX_inv = np.linalg.inv(X2.T @ X2)
meat = X2_adj.T @ np.diag(residuals**2) @ X2_adj
vcov = XtX_inv @ meat @ XtX_inv
return vcov
# 计算修正的标准误
vcov_cf = compute_cf_vcov(stage1_model, stage2_model, data)
se_cf = np.sqrt(np.diag(vcov_cf))
# 显示修正的结果
print("\n修正后的标准误:")
for i, name in enumerate(stage2_model.params.index):
t_stat = stage2_model.params[i] / se_cf[i]
p_value = 2 * (1 - stats.t.cdf(np.abs(t_stat), n - k))
print(f"{name}: {stage2_model.params[i]:.4f} ({se_cf[i]:.4f}) [t={t_stat:.3f}, p={p_value:.4f}]")
# ============================================
# 对比OLS(有偏的估计)
# ============================================
X_ols = data[['log_price', 'sugar', 'fiber']]
X_ols = sm.add_constant(X_ols)
ols_model = sm.OLS(y_stage2, X_ols).fit()
print("\nOLS vs 控制函数法对比:")
comparison = pd.DataFrame({
'OLS': ols_model.params,
'CF': stage2_model.params,
'Difference': stage2_model.params - ols_model.params
})
print(comparison)
# ============================================
# 经济学检验
# ============================================
# 1. Hausman检验:OLS和CF估计是否显著不同?
hausman_stat = (stage2_model.params - ols_model.params).T @ \
np.linalg.inv(vcov_cf - ols_model.cov_params()) @ \
(stage2_model.params - ols_model.params)
p_value_hausman = 1 - stats.chi2.cdf(hausman_stat, df=len(stage2_model.params))
print(f"\nHausman检验(CF vs OLS):")
print(f"Test stat = {hausman_stat:.4f}, p-value = {p_value_hausman:.4f}")
# 2. 测试控制函数系数是否显著
lambda_coef = stage2_model.params['v_hat']
lambda_se = se_cf[-1]
t_stat = lambda_coef / lambda_se
p_value = 2 * (1 - stats.t.cdf(np.abs(t_stat), n - k))
print(f"\n控制函数系数检验:")
print(f"系数 = {lambda_coef:.4f}")
print(f"标准误 = {lambda_se:.4f}")
print(f"t统计量 = {t_stat:.3f}")
print(f"p-value = {p_value:.4f}")
if p_value < 0.05:
print("→ 控制函数显著!内生性确实存在。")
else:
print("→ 控制函数不显著。可能没有内生性,或者模型规范有问题。")
# ============================================
# 敏感性分析:添加高阶项
# ============================================
data['v_hat_sq'] = data['v_hat']**2
X_stage2_ext = data[['log_price', 'sugar', 'fiber', 'v_hat', 'v_hat_sq']]
X_stage2_ext = sm.add_constant(X_stage2_ext)
stage2_extended = sm.OLS(y_stage2, X_stage2_ext).fit()
print("\n带高阶项的模型:")
print(stage2_extended.summary())
# F检验:高阶项是否显著?
f_stat = stage2_extended.f_tests([[0, 0, 0, 0, 0, 1]]).statistic[0][0]
p_value = 1 - stats.f.cdf(f_stat, 1, n - k - 1)
print(f"\nv_hat²的F检验:p-value = {p_value:.4f}")
if p_value < 0.05:
print("→ 非线性关系显著。考虑使用扩展模型。")
else:
print("→ 线性假设合理。")
|