Rozhodovací stromy 2

normalizace kriteria

přímočaře upravíme metodu induce_tree -- viz komentáře v kódu

In [ ]:
    def induce_tree(self,X,y,lvl=0):
        
        result = node()
        result.decisionValues = y.value_counts()
        result.decision = result.decisionValues.index[0]

        # pokud nastalo kriterium zastaveni
        # ukoncime indukci (vratime listovy uzel)
        if self.stopping_criterion(X,y,lvl):
            result.decisionNode = True
            return result;

        
        # TADY ZACINAJI ZMENY
        # hledame prumer IG
        information_gains = []

        for col in X.columns:
            # pripravime splity a
            # vypocteme IG
            information_gains.append((col,self.splitting_criterion(y,split_by_values(X,y,col))))

        meanIG = np.mean( [ig for _,ig in information_gains])


        max_crit = -1
        max_attribute = None

        for col,ig in information_gains:

            # pro attributy s nadprumernym IG
            if ig > meanIG:
                # vypocteme Gain Ratio
                crit = ig/H(X[col])
                
                # hledame nejlepsi
                if crit > max_crit:
                    max_crit = crit
                    max_attribute = col


        # TADY KONCI ZMENY
        if max_crit <= 0.0000001:
            result.decisionNode = True
            return result;


        # vygenerujeme potomky 
        max_attr_values = X[max_attribute].unique()
        max_indices = [(X[max_attribute] == value) for value in max_attr_values]
        
        

        childs = {value : self.induce_tree(X[index] , y[index],lvl+1)  for value,index in zip(max_attr_values, max_indices) }
        result.decisionNode = False
        result.childs = childs
        result.attribute = max_attribute

        return result

ořezávání (pruning)

cost complexity pruning

Nebudeme fyzicky vytvářet sekvenci stromů, ale uzlům dáme atribut prune_lvl, který (pokud není None) bude říkat, kdy byl uzel ořezán.

In [ ]:
class node:

    def __init__(self):

        # priznak jestli jde o interni uzel [False] 
        # nebo o listovy (rozhodovaci) uzel [True]
        self.decisionNode = False

        # attribut, podle ktereho se rozhodujeme
        # internim uzlu
        self.attribute = None
        
        # cilovy attribut -- budeme jej uvazovat 
        # nejen v 
        self.decision = None
        
        # pocty pripadu z trenovacich dat
        # ktere byly pouzity pri indukci tohoho
        # uzlu
        self.decisionValues = None
        
        # potomci a predek
        self.childs = {}
        self.parent = None
        
        # TADY JE ZMENA!!!
        self.prune_lvl = None

Budeme potřebovat pomocnou funkci, která projde strom a vrátí nám všechny rozhodovací uzly (listy), nebo všechny interní uzly, nebo obojí -- to v závislosti na argumentech leaves_only a add_leaves. Navíc bude brát argument prune_lvl, který určuje, která úroveň ořezání je aktualní -- tedy které interní uzly s nastaveným prune_lvl má uvažovat jako listy.

In [ ]:
def nodes_aux(node,leaves_only=False,add_leaves=True,prune_lvl=0):
        
    if node.decisionNode or (node.prune_lvl and node.prune_lvl < prune_lvl):
        if add_leaves:
            return([node])
        else:
            return([])
            
    else:
        res = []
        
        if not leaves_only: 
            res.append(node)
            
        for c in node.childs.values():
            res.extend(nodes_aux(c,leaves_only=leaves_only,add_leaves=add_leaves,prune_lvl=prune_lvl))
        return(res)

Tuto funkci využijeme v odpovídajících metodách stromu:

In [ ]:
    def nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,prune_lvl=prune_lvl)

    def internal_nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,add_leaves=False,prune_lvl=prune_lvl)
            
    def leaves(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=True,prune_lvl=prune_lvl)
    
    def count_leaves(self,prune_lvl=0):
        return len(self.leaves(prune_lvl=prune_lvl))

Metodu predict rozhodovacího stromu upravíme tak, aby také uvažovala prune_lvl a tedy uvažovala uzly uzly s nižší hodnotou své vlastnosti prune_lvl za ořezané (tedy za listy).

In [ ]:
    def predict(self,X,prune_lvl=0):
        decisions = []
        for  _, row in X.iterrows():
            
            actualNode = self.root
            
            # TADY JE JEDINA ZMENA:
            while (not (actualNode.decisionNode or (actualNode.prune_lvl and actualNode.prune_lvl < prune_lvl))):
                
                value = row[actualNode.attribute]
                nextnode = actualNode.childs.get(value)
                
                if nextnode == None:
                    decisions.append(actualNode.decision)
                    break;
                    
                actualNode = nextnode;
            else:    
                decisions.append(actualNode.decision)        
        return pd.Series(decisions,dtype="category")

Dále budeme potřebovat odhad chyby, opět vzhledem k prune_lvl

In [ ]:
    def misclassification_error(self,X,y,prune_lvl=0):
        yp = self.predict(X,prune_lvl=prune_lvl)
        return (1 - accuracy_score(y, yp))

Nyní máme vše abychom vytvořili sekvenci stromů (fáze 1 z cost complexity pruning). Následující metoda najde nejvhodnější vnitřní uzel k oříznutí a ořeže jej (nastaví mu prune_lvl).

In [ ]:
    def prune_step(self,X,y,lvl):

        #Pokud už jsme ořezaní až na kořen, k ořezání nedojde
        if self.root.decisionNode or self.root.prune_lvl:
            return 
        
        #Vypočteme odhad chyby před ořezáním
        est = self.misclassification_error(X,y,prune_lvl=lvl)
        # a zjistíme počet listů
        leaf_count = self.count_leaves()
        
        best_to_prune = None
        best_alpha  = np.Infinity

        nodelist = self.internal_nodes(prune_lvl=lvl)        
        for n in nodelist:
            
            # každý interní uzel zkusíme ořezat 
            n.prune_lvl = lvl
            
            # a vypočíst nejlepší alpha
            pruned_est = self.misclassification_error(X,y,prune_lvl=lvl+1)
            pruned_leaf_count = self.count_leaves(prune_lvl=lvl+1)
            alpha = (pruned_est - est)/(leaf_count-pruned_leaf_count)
        
            #to nejlepší (= nejnižší) alpha a odpovidající uzel si zapamatujeme
            if alpha < best_alpha:
                best_to_prune = n
                best_alpha = alpha
        
            n.prune_lvl = None
        
        # a finálně ořežeme
        best_to_prune.prune_lvl = lvl
        return

Právě popsanou metodu použijeme k vytvoření celé sekvence (fyzicky jde ale pořád o jeden strom); vypočítame odhad chyby v různých úrovních prune_lvl (podle (Xp,yp)) a najdeme tu nejlepší úroveň.

In [ ]:
    def prune(self,X,y,Xp,yp):
        # generate ...
        
        lvl = 1
        while(not self.root.prune_lvl):
            self.prune_step(X,y,lvl)
            lvl += 1


        best_error = 1
        best_lvl = None;
        
        for plvl in range(0,lvl+1):
            error = self.misclassification_error(Xp,yp,prune_lvl=plvl)
            
            if best_error >= error:
                best_lvl = plvl
                best_error = error
                
        return best_lvl;

Celý kód třídy decision_tree pak vypadá takto:

In [ ]:
import pandas as pd
import numpy as np
import math
from sklearn.metrics import accuracy_score

class decision_tree:
    
    def __init__(self): 
        self.root = None
        
    def stopping_criterion(self,X,y,lvl):
        #X   -- data v uzlu
        #y   -- jejich tridy
        #lvl -- v jake jsme hloubce
        return(y.value_counts().size == 1)
        
    def splitting_criterion(self,y,y_splits):
        return reduction_of_impurity(y,y_splits)

    def predict(self,X,prune_lvl=0):
        decisions = []
        for  _, row in X.iterrows():
            
            #sestupujeme z korene k rozhodovacimu uzlu
            actualNode = self.root
            while (not (actualNode.decisionNode or (actualNode.prune_lvl and actualNode.prune_lvl < prune_lvl))):
                
                
                
                value = row[actualNode.attribute]
                nextnode = actualNode.childs.get(value)
                
                #pokud odpovidajici podstrom neexistuje
                #pouzijeme decision z 
                if nextnode == None:
                    decisions.append(actualNode.decision)
                    break;
                    
                actualNode = nextnode;
            else:    
                decisions.append(actualNode.decision)        
        return pd.Series(decisions,dtype="category")
    
    def fit(self,X,y):
        self.root = self.induce_tree(X,y,0)
        return self
    
    def induce_tree(self,X,y,lvl=0):

        result = node()
        result.decisionValues = y.value_counts()
        result.decision = result.decisionValues.index[0]
        
        # pokud nastalo kriterium zastaveni
        # ukoncime indukci (vratime listovy uzel)
        if self.stopping_criterion(X,y,lvl):
            result.decisionNode = True
            return result;

        
        # hledame 
        information_gains = []

        for col in X.columns:
            # pripravime splity a
            # vypocteme kriterium splitu
            information_gains.append((col,self.splitting_criterion(y,split_by_values(X,y,col))))

        meanIG = np.mean( [ig for _,ig in information_gains])

        max_crit = -1
        max_attribute = None


        for col,ig in information_gains:

            if ig > meanIG:
            # pripravime splity a
            # vypocteme kriterium splitu
                crit = ig/H(X[col])
    
                if crit > max_crit:
                    max_crit = crit
                    max_attribute = col


        # OK, mame nejlepsi attribut na split
        # pokud ani tento attribut nesnizuje necistotu (kriterium je 0)
        # ukoncime indukci (vytvorime listovy uzel)
        if max_crit <= 0.0000001:
            result.decisionNode = True
            return result;


        # vygenerujeme potomky 
        max_attr_values = X[max_attribute].unique()
        max_indices = [(X[max_attribute] == value) for value in max_attr_values]
        
        

        childs = {value : self.induce_tree(X[index] , y[index],lvl+1)  for value,index in zip(max_attr_values, max_indices) }
        result.decisionNode = False
        result.childs = childs
        result.attribute = max_attribute

        return result

    def misclassification_error(self,X,y,prune_lvl=0):
        yp = self.predict(X,prune_lvl=prune_lvl)
        return (1 - accuracy_score(y, yp))
    

    def copy(self):
        
        newtree = decision_tree()
        newtree.root = self.root.copy()        
        return(newtree)

    def nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,prune_lvl=prune_lvl)

    def internal_nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,add_leaves=False,prune_lvl=prune_lvl)
            
    def leaves(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=True,prune_lvl=prune_lvl)
    
    def count_leaves(self,prune_lvl=0):
        return len(self.leaves(prune_lvl=prune_lvl))
        #TODO


    def prune_step(self,X,y,lvl):

        if self.root.decisionNode or self.root.prune_lvl:
            return 
        
        est = self.misclassification_error(X,y,prune_lvl=lvl)
        leaf_count = self.count_leaves()
        
        nodelist = self.internal_nodes(prune_lvl=lvl)


        best_to_prune = None
        best_alpha  = np.Infinity

        for n in nodelist:
            n.prune_lvl = lvl
            
            pruned_est = self.misclassification_error(X,y,prune_lvl=lvl+1)
            pruned_leaf_count = self.count_leaves(prune_lvl=lvl+1)
            alpha = (pruned_est - est)/(leaf_count-pruned_leaf_count)
        
            if alpha < best_alpha:
                best_to_prune = n
                best_alpha = alpha
        
            n.prune_lvl = None
        
        best_to_prune.prune_lvl = lvl
        return

    def prune(self,X,y,Xp,yp):
        # generate ...
        
        lvl = 1
        while(not self.root.prune_lvl):
            self.prune_step(X,y,lvl)
            lvl += 1


        best_error = 1
        best_lvl = None;
        
        for plvl in range(0,lvl+1):
            error = self.misclassification_error(Xp,yp,prune_lvl=plvl)
                        
            if best_error >= error:
                best_lvl = plvl
                best_error = error
                
        return best_lvl;

Můžeme testnout:

In [ ]:
import os
from sklearn.model_selection import train_test_split

os.chdir("...")

columns = ["class",
           "handicapped-infants",
           "water-project-cost-sharing",
           "adoption-of-the-budget-resolution",
           "physician-fee-freeze",
           "el-salvador-aid",
           "religious-groups-in-schools",
           "anti-satellite-test-ban",
           "aid-to-nicaraguan-contras",
           "mx-missile",
           "immigration",
           "synfuels-corporation-cutback",
           "education-spending",
           "superfund-right-to-sue",
           "crime",
           "duty-free-exports",
           "export-administration-act-south-africa"]

dataset = pd.read_csv("c:/Users/Babylon/Dropbox/_TODO/2018_02/ZZD/house-votes-84.data", names=columns, index_col=None,na_values="?")
dataset = dataset.dropna().reset_index(drop=True)
X = dataset.drop("class", axis=1)
y = dataset["class"]


Xt, Xp, yt, yp = train_test_split(X, y, test_size=0.2)

DT = decision_tree()    
DT.fit(X,y)
lvl = DT.prune(Xt,yt,Xp,yp)
print(lvl)
print(DT.misclassification_error(Xt,yt,prune_lvl=lvl))
print(DT.misclassification_error(Xp,yp,prune_lvl=lvl))