In [1]:
import pandas as pd
import numpy as np
import math
import os

os.chdir("D:/WORK/Dropbox/__ZZD__/ZZD/")
In [2]:
columns = ["class",
           "handicapped-infants",
           "water-project-cost-sharing",
           "adoption-of-the-budget-resolution",
           "physician-fee-freeze",
           "el-salvador-aid",
           "religious-groups-in-schools",
           "anti-satellite-test-ban",
           "aid-to-nicaraguan-contras",
           "mx-missile",
           "immigration",
           "synfuels-corporation-cutback",
           "education-spending",
           "superfund-right-to-sue",
           "crime",
           "duty-free-exports",
           "export-administration-act-south-africa"]

dataset = pd.read_csv("house-votes-84.data", names=columns, index_col=None,na_values="?")
dataset = dataset.dropna().reset_index(drop=True)
X = dataset.drop("class", axis=1)
y = dataset["class"]
In [3]:
def H(y):
    total = len(y)    
    result = 0
    for count in y.value_counts():
        if count == 0: continue
        p = count/total
        result -= p * math.log2(p)
    return result


def reduction_of_impurity(y,y_splits,impurity=H):
    result = impurity(y)
    total = len(y)
    for y_split in y_splits:
        result -= len(y_split)/total * impurity(y_split)
    return result

def split_by_values(X,y,col):
    attr_values = X[col].unique()
    indices = [(X[col] == value) for value in attr_values]
    return [y[index] for index in indices]

Uzel bude mit dodatecny atribut prune_lvl, ktery prezentuje v kterem kroku byl uzel odrezan

In [4]:
class node:

    def __init__(self):

        # attribut, podle ktereho se rozhodujeme
        self.attribute = None
        
        self.decisionNode = False
        # priznak, jestli je to rozhodovaci 
        # uzel (list)
        self.decision = None
        self.prune_lvl = None
        
        # slovnik:
        self.childs = {}
        self.parent = None
        self.decisionValues = None

pomocna funkce (linearizace stromu) vytvori seznam vsech uzlu, nebo jen listu, nebo vsech vnitrnich uzlu v zavislosti na argumentech leaves_only,a add_leaves

bere v uvahu prune_level (ve vyslednem seznamu nejsou odrezane uzly)

In [5]:
def nodes_aux(node,leaves_only=False,add_leaves=True,prune_lvl=0):
        
    if node.decisionNode or (node.prune_lvl and node.prune_lvl < prune_lvl):
        if add_leaves:
            return([node])
        else:
            return([])
            
    else:
        res = []
        
        if not leaves_only: 
            res.append(node)
            
        for c in node.childs.values():
            res.extend(nodes_aux(c,leaves_only=leaves_only,add_leaves=add_leaves,prune_lvl=prune_lvl))
        return(res)

nasleduje cely kod rozhodovaciho stromu

In [6]:
from sklearn.metrics import accuracy_score

class decision_tree:
    
    def __init__(self): 
        self.root = None
        
    def stopping_criterion(self,X,y,lvl):
        #X   -- data v uzlu
        #y   -- jejich tridy
        #lvl -- v jake jsme hloubce
        return(y.value_counts().size == 1)
        
    def splitting_criterion(self,y,y_splits):
        return reduction_of_impurity(y,y_splits)

    def predict(self,X,prune_lvl=0,max_depth=-1):
        decisions = []
        for  _, row in X.iterrows():
            
            depth = 0
            
            #sestupujeme z korene k rozhodovacimu uzlu
            actualNode = self.root
            while (not (actualNode.decisionNode 
                        or 
                        (actualNode.prune_lvl and actualNode.prune_lvl < prune_lvl) 
                        or 
                        (depth == max_depth)
                        )):
                
                value = row[actualNode.attribute]
                nextnode = actualNode.childs.get(value)
                
                #pokud odpovidajici podstrom neexistuje
                #pouzijeme decision z aktualniho uzlu
                if nextnode == None:
                    decisions.append(actualNode.decision)
                    break;
                    
                depth = depth + 1
                actualNode = nextnode;
            else:    
                decisions.append(actualNode.decision)        
        return pd.Series(decisions,dtype="category")
    
    def fit(self,X,y):
        self.root = self.induce_tree(X,y,0)
        return self
    
    def induce_tree(self,X,y,lvl=0):

        result = node()
        result.decisionValues = y.value_counts()
        result.decision = result.decisionValues.index[0]
        
        # pokud nastalo kriterium zastaveni
        # ukoncime indukci (vratime listovy uzel)
        if self.stopping_criterion(X,y,lvl):
            result.decisionNode = True
            return result;

        
        # hledame maximalni redukci necistoty
        information_gains = []

        for col in X.columns:
            # pripravime splity a
            # vypocteme kriterium splitu
            information_gains.append((col,self.splitting_criterion(y,split_by_values(X,y,col))))

        meanIG = np.mean( [ig for _,ig in information_gains])

        max_crit = -1
        max_attribute = None


        for col,ig in information_gains:

            if ig > meanIG:
            # pripravime splity a
            # vypocteme kriterium splitu
                crit = ig/H(X[col])
    
                if crit > max_crit:
                    max_crit = crit
                    max_attribute = col


        # OK, mame nejlepsi attribut na split
        # pokud ani tento attribut nesnizuje necistotu (kriterium je 0)
        # ukoncime indukci (vytvorime listovy uzel)
        if max_crit <= 0.0000001:
            result.decisionNode = True
            return result;


        # vygenerujeme potomky 
        max_attr_values = X[max_attribute].unique()
        max_indices = [(X[max_attribute] == value) for value in max_attr_values]
        
        

        childs = {value : self.induce_tree(X[index] , y[index],lvl+1)  for value,index in zip(max_attr_values, max_indices) }
        result.decisionNode = False
        result.childs = childs
        result.attribute = max_attribute

        return result

    def misclassification_error(self,X,y,prune_lvl=0,max_depth=-1):
        yp = self.predict(X,prune_lvl=prune_lvl,max_depth=max_depth)
        return (1 - accuracy_score(y, yp))
    
    
    # vytvor kopii stromu
    def copy(self):    
        newtree = decision_tree()
        newtree.root = self.root.copy()        
        return(newtree)

    # vyuziti funkce nodes_aux    
    def nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,prune_lvl=prune_lvl)

    def internal_nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,add_leaves=False,prune_lvl=prune_lvl)
            
    def leaves(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=True,prune_lvl=prune_lvl)
    
    def count_leaves(self,prune_lvl=0):
        return len(self.leaves(prune_lvl=prune_lvl))

    # krok orezavani
    def prune_step(self,X,y,lvl):

        #limitni podminka, pokud je uroven orezani 0
        #nebo jsme u korene, skoncime
        if self.root.decisionNode or self.root.prune_lvl:
            return 
        
        #najdi odhad chyby, pocet listu a seznam vnitrnich uzlu 
        est = self.misclassification_error(X,y,prune_lvl=lvl)
        leaf_count = self.count_leaves()
        nodelist = self.internal_nodes(prune_lvl=lvl)


        #projdeme vsechny uzly, a najdeme ten, ktery 
        #ma nejnizsi hodnotu alfa
        best_to_prune = None
        best_alpha  = np.Infinity

        for n in nodelist:
            n.prune_lvl = lvl
            
            pruned_est = self.misclassification_error(X,y,prune_lvl=lvl+1)
            pruned_leaf_count = self.count_leaves(prune_lvl=lvl+1)
            alpha = (pruned_est - est)/(leaf_count-pruned_leaf_count)
        
            if alpha < best_alpha:
                best_to_prune = n
                best_alpha = alpha
        
            n.prune_lvl = None
        
        # nastavime nalezeny list jako orezany
        # na teto urovni
        best_to_prune.prune_lvl = lvl
        return

    # samotne orezavani
    def prune(self,X,y,Xp,yp):
        # generate vygenerujeme sekvenci orezanych
        # rozhodovacich stromu (urovne orezani)
        
        lvl = 1
        while(not self.root.prune_lvl):
            self.prune_step(X,y,lvl)
            lvl += 1


        best_error = 1
        best_lvl = None;
        
        # najdeme ty, ktere jsou neorezane
        for plvl in range(0,lvl+1):
            error = self.misclassification_error(Xp,yp,prune_lvl=plvl)
            
            if best_error >= error:
                best_lvl = plvl
                best_error = error
                
        return best_lvl;

pokus s hloubkou

In [7]:
from sklearn.model_selection import train_test_split

Xt, Xp, yt, yp = train_test_split(X, y, test_size=0.2)

DT = decision_tree()    
DT.fit(Xt,yt)

for max_d in range(7):
    print("max_depth:", max_d, "  error:", DT.misclassification_error(Xp,yp,max_depth=max_d))
max_depth: 0   error: 0.44680851063829785
max_depth: 1   error: 0.0
max_depth: 2   error: 0.0
max_depth: 3   error: 0.0
max_depth: 4   error: 0.0
max_depth: 5   error: 0.021276595744680882
max_depth: 6   error: 0.021276595744680882
In [8]:
pl = DT.prune(Xt, yt, Xp, yp)
print(pl)
8
In [9]:
DT.misclassification_error(Xp,yp,prune_lvl=pl)
Out[9]:
0.0
In [10]:
print("#nodes:")
print("original tree:", len(DT.nodes()))
print("prunded tree:", len(DT.nodes(prune_lvl=pl)))
#nodes:
original tree: 33
prunded tree: 3