The “bread” either side: \((X^{\intercal}X)^{-1}X^{\intercal}\) on the left and its transpose \(X(X^{\intercal}X)^{-1}\) on the right
The “meat” in the middle: what we assume for \(E[\epsilon\epsilon^{\intercal}]\)
Note that this is the same as the error variance, since \(V[\epsilon]=E[\epsilon\epsilon^{\intercal}]-E[\epsilon]E[\epsilon^{\intercal}]\) and \(E[\epsilon] = 0\)
Our coefficient will only be efficient if these assumptions about the expected error are correct! We will explore what happens when the errors are assumed to be homoskedastic, heteroskedastic or clustered.
A good estimation of the vector of heteroskedastic error variances \(\sigma^2\) is again to apply the standard formula to the residuals for each individual:
Similar to homoskedastic errors, the things that impacts the standard error of coefficient \(k\) is:
The variance of all the individual errors, \(\sigma_i^2\)
The variance of the feature \(V(X_k)\)
Cheese roll: Clustered Errors 🧀
Cluster-robust errors are needed if:
Errors vary for every individual: \(V(\epsilon_i)=\sigma_i^2\) for all observations i.e. still heteroskedastic
AND errors within the same cluster \(C_l\) are serially correlated: \(cov(\epsilon_i,\epsilon_{j}) \neq 0 \text{ if } \epsilon_i,\epsilon_j \in C_l\)
Note - errors between clusters are assumed not to be serially correlated though i.e. \(cov(\epsilon_i,\epsilon_{j}) =0 \text{ if } \epsilon_i \in C_l,\epsilon_j \in C_{m \neq l}\)
What does this look like for \(E[\epsilon\epsilon^{\intercal}]\)?
The diagonal of the matrix is the estimate of variance which is unique for each observation, \(\sigma_i^2\)
The off-diagonals are also populated with the covariance - but only when they are both in the same cluster
Here is an example where observations 1, 2 and 3 are in cluster A, observations 4 and 5 are in cluster B, observations 6, 7 and 8 are in cluster C etc.
In this scenario, there are a few additional things that impact the standard error of coefficient \(k\):
If errors are correlated within clusters, this will increase the error.
If features are correlated within clusters, this will also increase the error (due to the off-diagonals in the error variance matrix)
And if both the errors and feature correlations are the same sign, this will also increase the standard error.
As well as the variance of the individual errors, \(\sigma_i^2\), and the variance of each feature \(V(X_k)\), as before
Coding it from scratch in python
First we use the OLS parent class we defined previously in the BLUE OLS post:
OLS class (source code):
import numpy as np, pandas as pdfrom typing import Optional, Tuplefrom scipy.linalg import qr, solve_triangularfrom scipy.stats import norm, tclass OLS:def _convert_types(self, z) -> np.ndarray:""" Re-shape and convert y to numpy array to work nicely with rest of functions """iftype(z) in [pd.DataFrame, pd.Series]: z2 = z.to_numpy()iftype(z) ==list: z2 = np.array(z)iftype(z) == np.ndarray: z2 = zelse:raiseTypeError('Array must be a pandas series/dataframe, numpy array or list')return z2def _get_y(self, y: Optional =None) -> np.ndarray:"""Re-shape and convert y to numpy array to work nicely with rest of functions"""if y isNone: y =self.yreturnself._convert_types(y).reshape(-1)def _get_X(self, X: Optional =None) -> Tuple[np.ndarray, np.ndarray]:"""Re-shape and convert X to numpy array to work nicely with rest of functions Also return names for summarising in coefficient table"""if X isNone: X =self.X X2 =self._convert_types(X)iftype(X) == pd.DataFrame: exog_names = np.array(X.columns)eliftype(X) == pd.Series: exog_names = np.array(['Unamed Exog Feature'])else: exog_names = np.arange(X2.shape[1]) X2 = X2.reshape(-1,len(exog_names))return (X2, exog_names)def _clear_fitted_attributes(self):"""Clears fitted attributes of the class"""self.beta, self.RSS, self.beta_se, self.conf_int, self.test_stat, self.p_val =None, None, None, None, None, Nonedef__init__(self, y: Optional[np.ndarray] =None, X: Optional[np.ndarray] =None ) ->None:"""Initializes the OLS class to run an least-squares regression"""if y isnotNone:self.y =self._get_y(y)self.n =len(self.y)if X isnotNone:self.X, self.exog_names =self._get_X(X)self.k =self.X.shape[1]if y isnotNoneand X isnotNoneandlen(self.y) !=self.X.shape[0]:raiseValueError("y and X must be the same size.")self._clear_fitted_attributes()def _quick_matrix_invert(self, X: np.ndarray) -> np.ndarray:""" Find the inverse of a matrix, using QR factorization """ Q, R = qr(X) X_inv = solve_triangular(R, np.identity(X.shape[1])).dot(Q.transpose())return X_invdef _check_if_fitted(self):"""Quick helper function that raises an error if the model has not been fitted already"""ifself.beta isNone:raiseValueError('Need to fit the model first - run fit()')else:returnTruedef _estimate_ols_coefs(self, y: Optional[np.ndarray] =None, X: Optional[np.ndarray] =None ):"""Estimates the OLS coefficients given a vector y and matrix X""" XTX = X.T.dot(X) XTY = X.T.dot(y) XTX_inv =self._quick_matrix_invert(XTX) coefs = XTX_inv.dot(XTY)return coefs, XTX_invdef fit(self, y: Optional[np.ndarray] =None, X: Optional[np.ndarray] =None, ):"""Estimates the OLS coefficients given a vector y and matrix X"""self._clear_fitted_attributes() y =self._get_y(y) X, exog_names =self._get_X(X)if y isNoneor X isNone:raiseValueError('X and y is required for fitting')iflen(y) != X.shape[0]:raiseValueError("y and X must be the same size.")self.y, self.X, self.exog_names = y, X, exog_namesself.n, self.k = X.shapeself.DoF =self.n -self.kself.beta, self.var_X_inv =self._estimate_ols_coefs(y,X)def _assess_fit(self, y: Optional[np.ndarray] =None, X: Optional[np.ndarray] =None, ) ->float:"""Returns the unadjusted R^2"""self._check_if_fitted()if (y isNoneand X isnotNone) or (y isnotNoneand X isNone):raiseValueError('Need to either provide both X and y, (or provide neither and R^2 is based on the X and y used for fitting)')else: y, (X, exog_names) =self._get_y(y), self._get_X(X) y_hat =self.predict(self.X) residuals = (y - y_hat).reshape(-1, 1) RSS = residuals.T.dot(residuals) TSS = (y - y.mean()).T.dot(y - y.mean()) unadj_r_squared =1- RSS/TSSif (y ==self.y).all() and (X ==self.X).all():self.residuals = residualsself.RSS = RSSself.TSS = TSSself.unadj_r_squared = unadj_r_squaredreturn unadj_r_squareddef _standard_error(self,) -> np.ndarray:"""Returns the standard errors for the coefficients from the fitted model"""ifself.RSS isNone:self._check_if_fitted()self._assess_fit() sigma_sq =self.RSS /float(self.DoF) * np.identity(len(self.beta)) var_b = sigma_sq.dot(self.var_X_inv)self.beta_se = np.sqrt(np.diag(var_b))returnself.beta_sedef _confidence_intervals(self, size =0.95):"""Returns the confidence intervals for the coefficients from the fitted model"""ifself.beta_se isNone:self._check_if_fitted()self._standard_error() alpha =1-(1-size)/2self.conf_int = np.array([self.beta - t.ppf(alpha, self.DoF) *self.beta_se,self.beta + t.ppf(alpha, self.DoF) *self.beta_se ])returnself.conf_intdef _test_statistic(self) -> np.ndarray:"""Returns the test statistics for the coefficients from the fitted model"""ifself.conf_int isNone:self._check_if_fitted()self.conf_int =self._confidence_intervals()self.test_stat =self.beta.flatten() /self.beta_sereturnself.test_statdef _p_value(self, z_dist: bool=False) -> np.ndarray:"""Returns the p-values for the coefficients from the fitted model."""ifself.test_stat isNone:self._check_if_fitted()self.test_stat =self._test_statistic()if z_dist:self.p_val = [norm.cdf(-abs(z)) +1- norm.cdf(abs(z)) for z inself.test_stat]else:self.p_val = [2* t.sf(abs(x), self.DoF) for x inself.test_stat]returnself.p_val def predict(self, X: Optional[np.ndarray] =None, ) -> np.ndarray:"""Predict values for y. Returns fitted values if X not provided."""self._check_if_fitted() X2, exog_names =self._get_X(X) y_hat = X2.dot(self.beta)if X isNone:self.y_hat = y_hatreturn y_hatdef summary(self, z_dist: bool=False) -> pd.DataFrame:"""Returns the coefficients, standard errors, test statistics and p-values in a Pandas DataFrame."""ifself.p_val isNone:self._check_if_fitted()self._p_value(z_dist) summary = pd.DataFrame( data={'Coefficient': self.beta.flatten(),'Standard Error': self.beta_se,'Lower bound': self.conf_int[0],'Upper bound': self.conf_int[1],'test-statistic': self.test_stat,'p-value': self.p_val, }, index=self.exog_names, )return summary
Now we can define two new classes: * A new class, error_covariance that returns an \(n \times n\) matrix of the estimated errors * a child ols_sandwich class, that adapts the standard errors of the existing OLS class:
Sandwich class (source code):
from ols_blue import OLSfrom scipy.sparse import bsr_arrayimport pandas as pd, numpy as npfrom typing import Optional, Tupleclass error_covariance():def _homoskedastic(self) -> np.ndarray:"""Return an error covariance matrix assuming homoskedasticity""" e =self.residuals ee = np.identity(self.n) * e.T.dot(e) ee /=float(self.DoF)return eedef _heteroskedastic(self, type: str='HC0') -> np.ndarray:"""Return an error covariance matrix assuming heteroskedasticity (HC0/1)""" ee = np.diagflat(self.residuals**2)iftype=='HC1': ee *=self.n/self.DoFreturn eedef _clustered(self, clusters: np.ndarray, finite_sample_correction=True) -> np.ndarray:"""Return an error covariance matrix assuming clustered errors""" ee = bsr_array((self.n, self.n)).toarray()for i inrange(self.n):for j inrange(self.n):if clusters[i] == clusters[j]: ee[i, j] =self.residuals[i] *self.residuals[j] n_cl =len(np.unique(clusters)) n, k =self.n, self.kif finite_sample_correction: ee *= ((n -1) / (n - k)) * (n_cl / (n_cl -1))return eeclass ols_sandwich(OLS, error_covariance):def__init__(self, y=None,X=None,residuals=None) ->None:"""The sandwich class adapts the standard errors of existing OLS regressions"""super().__init__(y=y,X=X)def _XeeX(self, ee: np.ndarray):"""Return the meat of the sandwich estimator"""returnself.X.T.dot(ee).dot(self.X)def _sandwich(self, meat: np.ndarray, bread: Optional[np.ndarray] =None) ->None:"""Helper function to return a 'sandwich' from bread and meat"""if bread isNone: bread =self.var_X_inv sandwich = bread.dot(meat).dot(bread) beta_se = np.sqrt(np.diag(sandwich))return beta_sedef _cluster_robust_fast(self, clusters: np.ndarray):"""Rather than specifiying the individual errors before calculating the meat, we can instead calculate "mini" meats and sum them up at the end"""def cluster_XeeX(cluster_index): j = clusters == cluster_index _X, _e =self.X[j, :], self.residuals[j] _eX = _e.T.dot(_X) _XeeX = _eX.T.dot(_eX)return _XeeX clusters = clusters.flatten() cluster_XeeX = [cluster_XeeX(i) for i in np.unique(clusters)] n_cl =len(np.unique(clusters)) n, k =self.n, self.k# finite-sample correction factor. # sum XeeX across all clusters XeeX = ((n -1) / (n - k)) * (n_cl / (n_cl -1)) * np.sum(cluster_XeeX, axis=0)# summed - i.e. requires averaging, so needs many clusters to ensure V is consistent# https://cameron.econ.ucdavis.edu/research/Cameron_Miller_JHR_2015_February.pdf (p.7-9)return XeeXdef _beta_se(self, se_correction: Optional[str| np.ndarray] =None) ->None:"""Return the standard errors of the coefficients"""self.beta_se, self.conf_int, self.test_stat, self.p_val=None, None, None, Noneiftype(se_correction) == np.ndarray: ee =self._clustered(se_correction)eliftype(se_correction) ==str:if se_correction =="homoskedastic": ee =self._homoskedastic()elif se_correction =="heteroskedastic": ee =self._heteroskedastic(type='HC1')else:raiseValueError(f"""Correction type {se_correction} not supported. Please choose from 'homoskedastic' or 'heteroskedastic', or supply an array for cluster groups""") XeeX =self._XeeX(ee) beta_se =self._sandwich(XeeX)return beta_sedef summary(self, z_dist: bool=False, se_correction: Optional[str] =None) -> pd.DataFrame:"""Returns the coefficients, standard errors, test statistics and p-values in a Pandas DataFrame."""self._check_if_fitted()self._assess_fit()if se_correction isnotNone:self.beta_se =self._beta_se(se_correction)self._p_value(z_dist) summary = pd.DataFrame( data={'Coefficient': self.beta.flatten(),'Standard Error': self.beta_se,'Lower bound': self.conf_int[0],'Upper bound': self.conf_int[1],'test-statistic': self.test_stat,'p-value': self.p_val, }, index=self.exog_names, )return summary
Now let’s test it! We will generate some data, run the OLS model, and compare our sandwich estimators to statsmodels:
pd.DataFrame( np.hstack([ our_mod.summary(se_correction="homoskedastic")['Standard Error'].to_numpy().reshape(-1,1), stats_mod.fit().bse.reshape(-1,1) ]), columns=['Our Standard Errors','StatsModel Standard Errors'],)
Our Standard Errors
StatsModel Standard Errors
0
0.131679
0.131679
1
0.138426
0.138426
And now heteroskedastic:
Code
pd.DataFrame( np.hstack([ our_mod.summary(se_correction='heteroskedastic')['Standard Error'].to_numpy().reshape(-1,1), stats_mod.fit(cov_type='HC1').bse.reshape(-1,1) ]), columns=['Our Standard Errors','StatsModel Standard Errors'],)
Our Standard Errors
StatsModel Standard Errors
0
0.126831
0.126831
1
0.108043
0.108043
And finally, cluster-robust:
Code
pd.DataFrame( np.hstack([ our_mod.summary(se_correction=cl)['Standard Error'].to_numpy().reshape(-1,1), stats_mod.fit(cov_type='cluster', cov_kwds={'groups': cl}).bse.reshape(-1,1) ]), columns=['Our Standard Errors','StatsModel Standard Errors'],)
Our Standard Errors
StatsModel Standard Errors
0
0.106352
0.106352
1
0.067777
0.067777
Some final reflections
Sandwich estimators correct for the variance by adjusting the standard error after OLS coefficient estimation. However, another approach is to correct for the variance before estimation - by first applying a weight matrix to the data before fitting. You can read more about this in the GLS post