def induce_tree(self,X,y,lvl=0):
result = node()
result.decisionValues = y.value_counts()
result.decision = result.decisionValues.index[0]
# pokud nastalo kriterium zastaveni
# ukoncime indukci (vratime listovy uzel)
if self.stopping_criterion(X,y,lvl):
result.decisionNode = True
return result;
# TADY ZACINAJI ZMENY
# hledame prumer IG
information_gains = []
for col in X.columns:
# pripravime splity a
# vypocteme IG
information_gains.append((col,self.splitting_criterion(y,split_by_values(X,y,col))))
meanIG = np.mean( [ig for _,ig in information_gains])
max_crit = -1
max_attribute = None
for col,ig in information_gains:
# pro attributy s nadprumernym IG
if ig > meanIG:
# vypocteme Gain Ratio
crit = ig/H(X[col])
# hledame nejlepsi
if crit > max_crit:
max_crit = crit
max_attribute = col
# TADY KONCI ZMENY
if max_crit <= 0.0000001:
result.decisionNode = True
return result;
# vygenerujeme potomky
max_attr_values = X[max_attribute].unique()
max_indices = [(X[max_attribute] == value) for value in max_attr_values]
childs = {value : self.induce_tree(X[index] , y[index],lvl+1) for value,index in zip(max_attr_values, max_indices) }
result.decisionNode = False
result.childs = childs
result.attribute = max_attribute
return result
class node:
def __init__(self):
# priznak jestli jde o interni uzel [False]
# nebo o listovy (rozhodovaci) uzel [True]
self.decisionNode = False
# attribut, podle ktereho se rozhodujeme
# internim uzlu
self.attribute = None
# cilovy attribut -- budeme jej uvazovat
# nejen v
self.decision = None
# pocty pripadu z trenovacich dat
# ktere byly pouzity pri indukci tohoho
# uzlu
self.decisionValues = None
# potomci a predek
self.childs = {}
self.parent = None
# TADY JE ZMENA!!!
self.prune_lvl = None
Budeme potřebovat pomocnou funkci, která projde strom a vrátí nám všechny rozhodovací uzly (listy), nebo všechny interní uzly, nebo obojí -- to v závislosti na argumentech leaves_only a add_leaves. Navíc bude brát argument prune_lvl, který určuje, která úroveň ořezání je aktualní -- tedy které interní uzly s nastaveným prune_lvl má uvažovat jako listy.
def nodes_aux(node,leaves_only=False,add_leaves=True,prune_lvl=0):
if node.decisionNode or (node.prune_lvl and node.prune_lvl < prune_lvl):
if add_leaves:
return([node])
else:
return([])
else:
res = []
if not leaves_only:
res.append(node)
for c in node.childs.values():
res.extend(nodes_aux(c,leaves_only=leaves_only,add_leaves=add_leaves,prune_lvl=prune_lvl))
return(res)
Tuto funkci využijeme v odpovídajících metodách stromu:
def nodes(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=False,prune_lvl=prune_lvl)
def internal_nodes(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=False,add_leaves=False,prune_lvl=prune_lvl)
def leaves(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=True,prune_lvl=prune_lvl)
def count_leaves(self,prune_lvl=0):
return len(self.leaves(prune_lvl=prune_lvl))
Metodu predict rozhodovacího stromu upravíme tak, aby také uvažovala prune_lvl a tedy uvažovala uzly uzly s nižší hodnotou své vlastnosti prune_lvl za ořezané (tedy za listy).
def predict(self,X,prune_lvl=0):
decisions = []
for _, row in X.iterrows():
actualNode = self.root
# TADY JE JEDINA ZMENA:
while (not (actualNode.decisionNode or (actualNode.prune_lvl and actualNode.prune_lvl < prune_lvl))):
value = row[actualNode.attribute]
nextnode = actualNode.childs.get(value)
if nextnode == None:
decisions.append(actualNode.decision)
break;
actualNode = nextnode;
else:
decisions.append(actualNode.decision)
return pd.Series(decisions,dtype="category")
Dále budeme potřebovat odhad chyby, opět vzhledem k prune_lvl
def misclassification_error(self,X,y,prune_lvl=0):
yp = self.predict(X,prune_lvl=prune_lvl)
return (1 - accuracy_score(y, yp))
Nyní máme vše abychom vytvořili sekvenci stromů (fáze 1 z cost complexity pruning). Následující metoda najde nejvhodnější vnitřní uzel k oříznutí a ořeže jej (nastaví mu prune_lvl).
def prune_step(self,X,y,lvl):
#Pokud už jsme ořezaní až na kořen, k ořezání nedojde
if self.root.decisionNode or self.root.prune_lvl:
return
#Vypočteme odhad chyby před ořezáním
est = self.misclassification_error(X,y,prune_lvl=lvl)
# a zjistíme počet listů
leaf_count = self.count_leaves()
best_to_prune = None
best_alpha = np.Infinity
nodelist = self.internal_nodes(prune_lvl=lvl)
for n in nodelist:
# každý interní uzel zkusíme ořezat
n.prune_lvl = lvl
# a vypočíst nejlepší alpha
pruned_est = self.misclassification_error(X,y,prune_lvl=lvl+1)
pruned_leaf_count = self.count_leaves(prune_lvl=lvl+1)
alpha = (pruned_est - est)/(leaf_count-pruned_leaf_count)
#to nejlepší (= nejnižší) alpha a odpovidající uzel si zapamatujeme
if alpha < best_alpha:
best_to_prune = n
best_alpha = alpha
n.prune_lvl = None
# a finálně ořežeme
best_to_prune.prune_lvl = lvl
return
Právě popsanou metodu použijeme k vytvoření celé sekvence (fyzicky jde ale pořád o jeden strom); vypočítame odhad chyby v různých úrovních prune_lvl (podle (Xp,yp)) a najdeme tu nejlepší úroveň.
def prune(self,X,y,Xp,yp):
# generate ...
lvl = 1
while(not self.root.prune_lvl):
self.prune_step(X,y,lvl)
lvl += 1
best_error = 1
best_lvl = None;
for plvl in range(0,lvl+1):
error = self.misclassification_error(Xp,yp,prune_lvl=plvl)
if best_error >= error:
best_lvl = plvl
best_error = error
return best_lvl;
Celý kód třídy decision_tree pak vypadá takto:
import pandas as pd
import numpy as np
import math
from sklearn.metrics import accuracy_score
class decision_tree:
def __init__(self):
self.root = None
def stopping_criterion(self,X,y,lvl):
#X -- data v uzlu
#y -- jejich tridy
#lvl -- v jake jsme hloubce
return(y.value_counts().size == 1)
def splitting_criterion(self,y,y_splits):
return reduction_of_impurity(y,y_splits)
def predict(self,X,prune_lvl=0):
decisions = []
for _, row in X.iterrows():
#sestupujeme z korene k rozhodovacimu uzlu
actualNode = self.root
while (not (actualNode.decisionNode or (actualNode.prune_lvl and actualNode.prune_lvl < prune_lvl))):
value = row[actualNode.attribute]
nextnode = actualNode.childs.get(value)
#pokud odpovidajici podstrom neexistuje
#pouzijeme decision z
if nextnode == None:
decisions.append(actualNode.decision)
break;
actualNode = nextnode;
else:
decisions.append(actualNode.decision)
return pd.Series(decisions,dtype="category")
def fit(self,X,y):
self.root = self.induce_tree(X,y,0)
return self
def induce_tree(self,X,y,lvl=0):
result = node()
result.decisionValues = y.value_counts()
result.decision = result.decisionValues.index[0]
# pokud nastalo kriterium zastaveni
# ukoncime indukci (vratime listovy uzel)
if self.stopping_criterion(X,y,lvl):
result.decisionNode = True
return result;
# hledame
information_gains = []
for col in X.columns:
# pripravime splity a
# vypocteme kriterium splitu
information_gains.append((col,self.splitting_criterion(y,split_by_values(X,y,col))))
meanIG = np.mean( [ig for _,ig in information_gains])
max_crit = -1
max_attribute = None
for col,ig in information_gains:
if ig > meanIG:
# pripravime splity a
# vypocteme kriterium splitu
crit = ig/H(X[col])
if crit > max_crit:
max_crit = crit
max_attribute = col
# OK, mame nejlepsi attribut na split
# pokud ani tento attribut nesnizuje necistotu (kriterium je 0)
# ukoncime indukci (vytvorime listovy uzel)
if max_crit <= 0.0000001:
result.decisionNode = True
return result;
# vygenerujeme potomky
max_attr_values = X[max_attribute].unique()
max_indices = [(X[max_attribute] == value) for value in max_attr_values]
childs = {value : self.induce_tree(X[index] , y[index],lvl+1) for value,index in zip(max_attr_values, max_indices) }
result.decisionNode = False
result.childs = childs
result.attribute = max_attribute
return result
def misclassification_error(self,X,y,prune_lvl=0):
yp = self.predict(X,prune_lvl=prune_lvl)
return (1 - accuracy_score(y, yp))
def copy(self):
newtree = decision_tree()
newtree.root = self.root.copy()
return(newtree)
def nodes(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=False,prune_lvl=prune_lvl)
def internal_nodes(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=False,add_leaves=False,prune_lvl=prune_lvl)
def leaves(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=True,prune_lvl=prune_lvl)
def count_leaves(self,prune_lvl=0):
return len(self.leaves(prune_lvl=prune_lvl))
#TODO
def prune_step(self,X,y,lvl):
if self.root.decisionNode or self.root.prune_lvl:
return
est = self.misclassification_error(X,y,prune_lvl=lvl)
leaf_count = self.count_leaves()
nodelist = self.internal_nodes(prune_lvl=lvl)
best_to_prune = None
best_alpha = np.Infinity
for n in nodelist:
n.prune_lvl = lvl
pruned_est = self.misclassification_error(X,y,prune_lvl=lvl+1)
pruned_leaf_count = self.count_leaves(prune_lvl=lvl+1)
alpha = (pruned_est - est)/(leaf_count-pruned_leaf_count)
if alpha < best_alpha:
best_to_prune = n
best_alpha = alpha
n.prune_lvl = None
best_to_prune.prune_lvl = lvl
return
def prune(self,X,y,Xp,yp):
# generate ...
lvl = 1
while(not self.root.prune_lvl):
self.prune_step(X,y,lvl)
lvl += 1
best_error = 1
best_lvl = None;
for plvl in range(0,lvl+1):
error = self.misclassification_error(Xp,yp,prune_lvl=plvl)
if best_error >= error:
best_lvl = plvl
best_error = error
return best_lvl;
Můžeme testnout:
import os
from sklearn.model_selection import train_test_split
os.chdir("...")
columns = ["class",
"handicapped-infants",
"water-project-cost-sharing",
"adoption-of-the-budget-resolution",
"physician-fee-freeze",
"el-salvador-aid",
"religious-groups-in-schools",
"anti-satellite-test-ban",
"aid-to-nicaraguan-contras",
"mx-missile",
"immigration",
"synfuels-corporation-cutback",
"education-spending",
"superfund-right-to-sue",
"crime",
"duty-free-exports",
"export-administration-act-south-africa"]
dataset = pd.read_csv("c:/Users/Babylon/Dropbox/_TODO/2018_02/ZZD/house-votes-84.data", names=columns, index_col=None,na_values="?")
dataset = dataset.dropna().reset_index(drop=True)
X = dataset.drop("class", axis=1)
y = dataset["class"]
Xt, Xp, yt, yp = train_test_split(X, y, test_size=0.2)
DT = decision_tree()
DT.fit(X,y)
lvl = DT.prune(Xt,yt,Xp,yp)
print(lvl)
print(DT.misclassification_error(Xt,yt,prune_lvl=lvl))
print(DT.misclassification_error(Xp,yp,prune_lvl=lvl))