之前做过一篇决策树基本原理的笔记,连接:https://blog.csdn.net/xyc_undermoon/article/details/90486195
信息论部分可参考吴军博士所著《数学之美》第六章内容,浅显易懂,对基础信息论的理解很有帮助;这次笔记记录一些常见决策树算法以及实现代码。
这里先简单回忆一下决策树:决策树是通过不断地选择特征对目标进行判断,我觉得数学中常用的二分法的思想与决策树有些相似;春节将近,在网上看到一张有意思的图片,感觉大致可以表现决策树的思想,图片附上:
常见模型
首先给定训练数据集T={(x1,y1),(x2,y2),…,(xm,ym)},其中,xi=(xi1,xi2,…,xid)为特征向量,yi∈{1,2,…,k}。基于此数据集生成决策树模型。
常见决策树模型有以下三种:
- ID3:分类算法,使用信息增益准则选择特征,相当于用极大似然法进行概率模型选择,树的形状为多叉树;
- C4.5:分类算法,与ID3算法相似,只是用信息增益比选择特征,同样是多叉树;
- CART:递归构建二叉决策树,既有分类算法也有回归算法。回归树:使用平方误差;分类树:使用基尼指数。
ID3算法
ID3算法的核心是在决策树各个节点上应用信息增益准则选择特征,递归地构建决策树。算法表示如下:
输入:训练数据集D,特征集A,阈值ε;
输出:决策树T
- 若D中所有实例属于同一分类Ck,则T为单结点树,并将分类Ck作为该结点的分类标记,返回T;
- 若A≠∅,则T为单结点树,并将D中实例数最大的分类Ck作为该结点的分类标记,返回T;
- 否则,计算A中各特征对D的信息增益,选择信息增益最大的特征Ag;
- 如果Ag的信息增益小于阈值ε,则置T为单结点树,并将D中实例数最大的分类Ck作为该结点的分类标记,返回T;
- 否则,对Ag的每一种可能值ai,依据Ag=ai将D分割为若干非空子集Di,将Di中实例数最大的分类作为标记,构建子结点,由结点及其子结点构成树T,返回T;
- 对第i个子结点,以Di为训练集,以A-{Ag}为特征集,递归地调用步骤1~3,得到子树Ti,返回Ti。
C4.5算法
C4.5算法在生成决策树的过程中,改用信息增益比来选择特征。算法表示如下:
输入:训练数据集D,特征集A,阈值ε;
输出:决策树T
- 若D中所有实例属于同一分类Ck,则T为单结点树,并将分类Ck作为该结点的分类标记,返回T;
- 若A≠∅,则T为单结点树,并将D中实例数最大的分类Ck作为该结点的分类标记,返回T;
- 否则,计算A中各特征对D的信息增益比,选择信息增益比最大的特征Ag;
- 如果Ag的信息增益比小于阈值ε,则置T为单结点树,并将D中实例数最大的分类Ck作为该结点的分类标记,返回T;
- 否则,对Ag的每一种可能值ai,依据Ag=ai将D分割为若干非空子集Di,将Di中实例数最大的分类作为标记,构建子结点,由结点及其子结点构成树T,返回T;
- 对第i个子结点,一Di为训练集,以A-{Ag}为特征集,递归地调用步骤1~3,得到子树Ti,返回Ti。
CART算法
CART全称是分类与回归树(classification and regression tree),是应用广泛的决策树学习方法。CART算法同样由特征选择、树的生成及剪枝组成,既可用于分类也可用于回归,CART假设决策树是二叉树,内部结点特征的取值为“是”和“否”,左分支取值“是”,右分支取值“否”。
CART算法由以下两部分组成:
- 决策树生成:基于训练数据集生成决策树,生成的决策树要尽量大;
- 决策树剪枝:用验证数据集对已生成的树进行剪枝并选择最优子树,用损失函数最小作为剪枝的标准。
CART生成
CART决策树的生成就是递归地构建二叉决策树的过程。对于回归树,评判标准使用平方误差最小化;对于分类树,使用基尼指数最小化。
1)回归树:
输入:数据集D;
输出:回归树f(x)。
在训练数据集所在的输入空间中,递归地将每个区域划分为两个子区域并决定每个子区域上的输出值,构建二叉决策树:
- 选择最优切分变量j与切分点s,求解
遍历变量j,对固定的切分变量j扫描切分点s,选择使上式达到最小的对(j,s);
- 用选定的对(j,s)划分区域并决定相应的输出值:
- 继续对两个子区域调用步骤1、2,直至满足停止条件;
- 将输入空间划分为M个区域R1,R2,…,RM,生成决策树:
2)分类树:
输入:训练数据集D,停止计算的条件
输出:CART决策树
根据训练数据集,从根结点开始,递归地对每个结点进行以下操作,构建二叉决策树:
- 设结点的训练数据集为D,计算现有特征对该数据集的基尼指数。此时对每一个特征A,对其可能取的每个值a,根据样本点对A=a的测试为“是”或“否”将D分割为D1和D2两部分,利用如下公式计算A=a时的基尼指数
- 在所有可能的特征A以及它们所有的切分点a中,选择基尼指数最小的特征及其对应的切分点作为最优特征与最优切分点,从现结点生成两个子结点,将训练数据集依特征分配到两个子结点中去;
- 对两个子结点递归地调用1、2,直至满足停止条件;
- 生成CART决策树。
算法停止的条件是结点中的样本个数小于预订阈值,或样本集的基尼指数小于预订阈值(样本基本属于同一分类),或没有更多特征。
CART剪枝
CART剪枝算法从“完全生长”的决策树的底端减去一些子树,使决策树变小(即使模型变简单),从而能够对位未知数据有更准确的预测。CART剪枝算法由两步组成:首先从生成算法产生的决策树T0底端开始不断剪枝,直到T0的根结点形成一个子树序列{T0,T1,…,Tn};然后通过交叉验证法在独立的验证数据集上对子树序列进行测试,从中选择最优子树。
1.剪枝:
剪枝过程计算子树的损失函数:
其中,T为任意子树,C(T)为对训练数据的预测误差(例如基尼指数等),|T|为子树的叶结点个数,α为不小于0的参数,衡量训练数据的拟合程度与模型的复杂度。
2.选取最优子树T_α
利用独立的验证数据集,测试子树序列T0,T1,…,Tn中各棵子树的平方误差或基尼指数,最小的为最优模型。
Python实现
# CART算法
class DecisionTree(object):
"""自定的树结构,用来保存决策树.
Paramters:
----------
col: int, default(-1)
当前使用的第几列数据
val: int or float or str, 分割节点
分割节点的值,
int or float : 使用大于进行比较
str : 使用等于模式
LeftChild: DecisionTree
左子树, <= val
RightChild: DecisionTree
右子树, > val
results:
"""
def __init__(self, col=-1, val=None, LeftChild=None, RightChild=None, result=None):
self.col = col
self.val = val
self.LeftChild = LeftChild
self.RightChild = RightChild
self.result = result
class DecisionTreeClassifier(object):
"""使用基尼指数的分类决策树接口.
Paramters:
---------
max_depth : int or None, optional(dafault=None)
表示决策树的最大深度. None: 表示不设置深度,可以任意扩展,
直到叶子节点的个数小于min_samples_split个数.
min_samples_split : int, optional(default=2)
表示最小分割样例数.
if int, 表示最小分割样例树,如果小于这个数字,不在进行分割.
min_samples_leaf : int, optional (default=1)
表示叶节点最少有min_samples_leaf个节点树,如果小于等于这个数,直接返回.
if int, min_samples_leaf就是最小样例数.
min_impurity_decrease : float, optional (default=0.)
分割之后基尼指数大于这个数,则进行分割.
N_t / N * (impurity - N_t_R / N_t * right_impurity
- N_t_L / N_t * left_impurity)
min_impurity_split : float, default=1e-7
停止增长的阈值,小于这个值直接返回.
Attributes
----------
classes_ : array of shape (n_classes,) or a list of such arrays
表示所有的类
feature_importances_ : ndarray of shape (n_features,)
特征重要性, 被选择最优特征的次数,进行降序.
tree_ : Tree object
The underlying Tree object.
"""
def __init__(self,
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
min_impurity_decrease=0.,
min_impurity_split=1e-7):
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.min_impurity_decrease = min_impurity_decrease
self.min_impurity_split = min_impurity_split
self.classes_ = None
self.max_features_ = None
self.decision_tree = None
self.all_feats = None
def fit(self, X, y, check_input=True):
"""使用X和y训练决策树的分类模型.
Parameters
----------
X : {array-like} of shape (n_samples, n_features)
The training input samples. Internally, it will be converted to
``dtype=np.float32``
y : array-like of shape (n_samples,) or (n_samples, n_outputs)
The target values (class labels) as integers or strings.
check_input : bool, (default=True)
Allow to bypass several input checking.
Returns
-------
self : object
Fitted estimator.
"""
if isinstance(X, list):
X = self.__check_array(X)
if isinstance(y, list):
y = self.__check_array(y)
if X.shape[0] != y.shape[0]:
raise ValueError("输入的数据X和y长度不匹配")
self.classes_ = list(set(y))
if isinstance(X, pd.DataFrame):
X = X.values
if isinstance(y, pd.DataFrame):
y = y.values
data_origin = np.c_[X, y]
# print (data_origin)
self.all_feats = [i for i in range(X.shape[1])]
self.max_features_ = X.shape[0]
data = copy.deepcopy(data_origin)
self.decision_tree = self.__build_tree(data, 0)
def __predict_one(self, input_x):
"""预测一个样例的返回结果.
Paramters:
---------
input_x : list or np.ndarray
需要预测输入数据
Returns:
-------
class : 对应的类
"""
tree = self.decision_tree
#============================= show me your code =======================
def run(input_x, tree):
"""内部使用函数
"""
# 叶子节点返回
if tree.result != None:
return tree.result
v = input_x[tree.col]
branch = None
if isinstance(v, int) or isinstance(v, float):
if v <= tree.val:
tree = tree.LeftChild
else:
tree = tree.RightChild
elif isinstance(v, str):
if v == tree.val:
tree = tree.LeftChild
else:
tree = tree.RightChild
return run(input_x, tree)
pre_y = run(input_x, tree)
#============================= show me your code =======================
return pre_y
def predict(self, test):
"""预测函数,
Paramters:
---------
test: {array-like} of shape (n_samples, n_features)
Returns:
result : np.array(list)
"""
result = []
for i in range(len(test)):
result.append(self.__predict_one(test[i]))
return np.array(result)
def score(self, vali_X, vali_y):
"""验证模型的特征,这里使用准确率.
Parameters
----------
vali_X : {array-like} of shape (n_samples, n_features)
The training input samples. Internally, it will be converted to
``dtype=np.float32``
vali_y : array-like of shape (n_samples,) or (n_samples, n_outputs)
The target values (class labels) as integers or strings.
Returns:
-------
score : float, 预测的准确率
"""
vali_y = np.array(vali_y)
pre_y = self.predict(vali_X)
pre_score = 1.0 * sum(vali_y == pre_y) / len(vali_y)
return pre_score
def __build_tree(self, data, depth):
"""创建决策树的主要代码
Paramters:
---------
data : {array-like} of shape (n_samples, n_features) + {label}
The training input samples. Internally, it will be converted to
``dtype=np.float32``
depth: int, 树的深度
Returns:
-------
DecisionTree
"""
labels = np.unique(data[:,-1])
# 只剩下唯一的类别时,停止,返回对应类别
if len(labels) == 1:
return DecisionTree(result=list(labels)[0])
# 遍历完所有特征时,只剩下label标签,就返回出现字数最多的类标签
if not self.all_feats:
return DecisionTree(result=np.argmax(np.bincount(data[:,-1].astype(int))))
# 超过最大深度,则停止,使用出现最多的参数作为该叶子节点的类
if self.max_depth and depth > self.max_depth:
return DecisionTree(result=np.argmax(np.bincount(data[:,-1].astype(int))))
# 如果剩余的样本数大于等于给定的参数 min_samples_split,
# 则不在进行分割, 直接返回类别中最多的类,该节点作为叶子节点
if self.min_samples_split >= data.shape[0]:
return DecisionTree(result=np.argmax(np.bincount(data[:,-1].astype(int))))
# 叶子节点个数小于指定参数就进行返回,叶子节点中的出现最多的类
if self.min_samples_leaf >= data.shape[0]:
return DecisionTree(result=np.argmax(np.bincount(data[:,-1].astype(int))))
# 根据基尼指数选择每个分割的最优特征
best_idx, best_val, min_gini = self.__getBestFeature(data)
# print ("Current best Feature:", best_idx, best_val, min_gini)
# 如果当前的gini指数小于指定阈值,直接返回
if min_gini < self.min_impurity_split:
return DecisionTree(result=np.argmax(np.bincount(data[:,-1].astype(int))))
leftData, rightData = self.__splitData(data, best_idx, best_val)
#============================= show me your code =======================
leftDecisionTree = self.__build_tree(leftData, depth + 1)
rightDecisionTree = self.__build_tree(rightData, depth + 1)
#============================= show me your code =======================
return DecisionTree(col=best_idx, val=best_val, LeftChild=leftDecisionTree, RightChild=rightDecisionTree)
def __getBestFeature(self, data):
"""得到最优特征对应的列
Paramters:
---------
data: np.ndarray
从data中选择最优特征
Returns:
-------
bestInx, val, 最优特征的列的索引和使用的值.
"""
best_idx = -1
best_val = None
min_gini = 1.0
# 遍历现在可以使用的特征列
#============================= show me your code =======================
for feat_idx in self.all_feats:
# 遍历所用的特征:
# 判断数据类型,貌似对numpy.ndarry不好有用
# numpy.ndarry的类型自动向上扩展
x = data[:, feat_idx]
for val in data[:, feat_idx]:
leftData, rightData = self.__splitData(data, feat_idx, val)
left_gini = self.gini(leftData[:, -1])
right_gini = self.gini(rightData[:, -1])
# print (len(leftData), len(rightData), len(data))
cur_gini = 1.0 * len(leftData) / len(data) * left_gini
cur_gini += 1.0 * len(rightData) / len(data) * right_gini
if cur_gini < min_gini:
best_idx = feat_idx
best_val = val
min_gini = cur_gini
#============================= show me your code =======================
# 删除使用过的特征
self.all_feats.remove(best_idx)
return best_idx, best_val, min_gini
def gini(self, labels):
"""计算基尼指数.
Paramters:
----------
labels: list or np.ndarray, 数据对应的类目集合.
Returns:
-------
gini : float ``` Gini(p) = \sum_{k=1}^{K}p_k(1-p_k)=1-\sum_{k=1}^{K}p_k^2 ```
"""
#============================= show me your code =======================
labelSet = np.array(labels)
length = labelSet.shape[0]
gini = 1.
classes = np.unique(labelSet)
for c in classes:
gini -= (1.0 * np.sum(labelSet == c) / length) ** 2
#============================= show me your code =======================
return gini
def __splitData(self, data, featColumn, val):
'''根据特征划分数据集分成左右两部分.
Paramters:
---------
data: np.ndarray, 分割的数据
featColumn : int, 使用第几列的数据进行分割
val : int or float or str, 分割的值
int or float : 使用比较方式
str : 使用相等方式
Returns:
-------
leftData, RightData
int or left: leftData <= val < rightData
str : leftData = val and rightData != val
'''
if isinstance(val, str):
leftData = data[data[:, featColumn] == val]
rightData = data[data[:, featColumn] != val]
elif isinstance(val, int) or isinstance(val, float):
leftData = data[data[:, featColumn] <= val]
rightData = data[data[:, featColumn] > val]
return leftData, rightData
def __check_array(self, X):
"""检查数据类型
Parameters:
----------
X : {array-like} of shape (n_samples, n_features)
The training input samples.
Retures
-------
X: {array-like} of shape (n_samples, n_features)
"""
if isinstance(X, list):
X = np.array(X)
if not isinstance(X, np.ndarray) and not isinstance(X, pd.DataFrame):
raise ValueError("输出数据不合法,目前只支持np.ndarray or pd.DataFrame")
return X
参考资料
- 《统计学习方法》.李航
- 《机器学习》.周志华
- 《数学之美》.吴军
文章评论