In [7]:
import pandas as pd
import numpy as np
import math
import os

os.chdir("D:/WORK/Dropbox/__ZZD__/ZZD/")
In [8]:
columns = ["class",
           "handicapped-infants",
           "water-project-cost-sharing",
           "adoption-of-the-budget-resolution",
           "physician-fee-freeze",
           "el-salvador-aid",
           "religious-groups-in-schools",
           "anti-satellite-test-ban",
           "aid-to-nicaraguan-contras",
           "mx-missile",
           "immigration",
           "synfuels-corporation-cutback",
           "education-spending",
           "superfund-right-to-sue",
           "crime",
           "duty-free-exports",
           "export-administration-act-south-africa"]

dataset = pd.read_csv("house-votes-84.data", names=columns, index_col=None)

#CHANGE: nemusime uz zahazovat radky s chybejicimi hodnotami. 

#dataset = dataset.dropna().reset_index(drop=True)
X = dataset.drop("class", axis=1)
y = dataset["class"]
In [9]:
# CHANGE
#uprava, H a dalsi procedury budou brat seznam hodnot, ktere zastupuji prazdne
#hodnoty, a ty budou ignorovat ::
#
#tedy, pokud pozdeji budu potrebovat chybejici hodnoty chtit povazovat za
#specialni hodnoty (pri normalizaci kriteria splitu), poslu prazdny seznam.

def H(y,na_values=[]):

    attr_vals = [val for val in y.unique() if val not in na_values]
    y = list(y)
    y_counts = [y.count(val) for val in attr_vals]
    total = sum(y_counts)

    result = 0
    for count in y_counts:
        if count == 0: continue
        p = count/total
        result -= p * math.log2(p)
    return result


def reduction_of_impurity(y,y_splits,impurity=H,na_values=[]):

    result = impurity(y,na_values)
    total = len(y)
    for y_split in y_splits:
        result -= len(y_split)/total * impurity(y_split,na_values)
    return result

def split_by_values(X,y,col,na_values=[]):    

    attr_values = [x for x in X[col].unique() if x not in na_values]

    indices = [(X[col] == value) for value in attr_values]

    return [y[index] for index in indices]
In [10]:
from sklearn.metrics import accuracy_score

class node:

    def __init__(self):

        # attribut, podle ktereho se rozhodujeme
        self.attribute = None
        
        self.decisionNode = False
        # priznak, jestli je to rozhodovaci 
        # uzel (list)
        self.decision = None
        self.prune_lvl = None
        
        # slovnik:
        self.childs = {}
        self.parent = None
        self.decisionValues = None
        
        # weight
        self.weight = 1


    def copy(self):
        # vytvori kopii uzlu
        newnode = node()
        newnode.attribute = self.attribute
        newnode.decision = self.decision
        newnode.decisionNode = self.decisionNode
        newnode.parent = self.parent
        newnode.decisionValues = self.decisionValues
        if not newnode.decisionNode:
            newnode.childs =  {v : c.copy() for v,c in self.childs.items()}
        newnode.size = self.size
        
        return(newnode)
In [11]:
def nodes_aux(node,leaves_only=False,add_leaves=True,prune_lvl=0):
    # pomocna funkce (linearizace stromu)
    # vytvori seznam vsech uzlu, nebo jen listu, nebo vsech vnitrnich uzlu
    # v zavislosti na argumentech leaves_only,a add_leaves
    
    # bere v uvahu prune_level
        
    if node.decisionNode or (node.prune_lvl and node.prune_lvl < prune_lvl):
        if add_leaves:
            return([node])
        else:
            return([])
            
    else:
        res = []
        
        if not leaves_only: 
            res.append(node)
            
        for c in node.childs.values():
            res.extend(nodes_aux(c,leaves_only=leaves_only,add_leaves=add_leaves,prune_lvl=prune_lvl))
        return(res)
In [12]:
class decision_tree:
    
    def __init__(self): 
        self.root = None
        
        #pridan seznam hodnot, ktery ma inducer povazovat za 
        #reprezentaci chybejici hodnoty
        self.missing_values = [pd.np.nan]
        
    def stopping_criterion(self,X,y,lvl):
        #X   -- data v uzlu
        #y   -- jejich tridy
        #lvl -- v jake jsme hloubce
        
        if lvl > 10:
            return True
        
        # nasledujici uprava je spise jen pro jistotu
        # cilovy attribut by nemel mit chybejici hodnoty
        target_values = [x for x in y.unique() if not x in self.missing_values]

        return(len(target_values) <= 1)
        
        
    def splitting_criterion(self,y,y_splits,na_values):
        return reduction_of_impurity(y,y_splits,na_values=na_values)


    def predict(self,X,prune_lvl=0,max_depth=-1):
        decisions = []
        for  _, row in X.iterrows():
            
            depth = 0
            
            #sestupujeme z korene k rozhodovacimu uzlu
            actualNode = self.root
            while (not (actualNode.decisionNode 
                        or 
                        (actualNode.prune_lvl and actualNode.prune_lvl < prune_lvl) 
                        or 
                        (depth == max_depth)
                        )):
                
                value = row[actualNode.attribute]
                nextnode = actualNode.childs.get(value)
                
                #pokud odpovidajici podstrom neexistuje
                #pouzijeme decision z aktualniho uzlu
                if nextnode == None:
                    decisions.append(actualNode.decision)
                    break;
                    
                depth = depth + 1
                actualNode = nextnode;
            else:    
                decisions.append(actualNode.decision)        
        return pd.Series(decisions,dtype="category")
    
    def fit(self,X,y):
        self.root = self.induce_tree(X,y,0)
        return self
    
    
    
    #CHANGE
    #pridana vaha (ovsem pri predict se jeste nepouziva)
    #pridan seznam pouzitych sloupcu, aby se zamezilo cykleni
    def induce_tree(self,X,y,lvl=0,weight=1,used_columns=[]):

        result = node()
        result.decisionValues = y.value_counts()
        
        #CHANGE
        #nastavime predanou vahu
        result.weight = weight
                
        result.decision = result.decisionValues.index[0]
        
        # pokud nastalo kriterium zastaveni
        # ukoncime indukci (vratime listovy uzel)
        if self.stopping_criterion(X,y,lvl):
            result.decisionNode = True
            return result;

        
        # hledame maximalni redukci necistoty
        information_gains = []

        for col in X.columns:
            # pripravime splity a
            # vypocteme kriterium splitu
            w = len([val for val in X[col] if val not in self.missing_values]) / len(X[col])            
            information_gains.append((col, w * self.splitting_criterion(y,split_by_values(X,y,col,self.missing_values),self.missing_values)))

        meanIG = np.mean( [ig for _,ig in information_gains])


        max_crit = -1
        max_attribute = None


        for col,ig in information_gains:
            if ig > max_crit:
                max_crit = ig
                max_attribute = col

        # OK, mame nejlepsi attribut na split
        # pokud ani tento attribut nesnizuje necistotu (kriterium je 0)
        # ukoncime indukci (vytvorime listovy uzel)
        if max_crit <= 0.1:
            result.decisionNode = True
            return result;


        # vygenerujeme potomky 
        max_attr_values = [x for x in (X[max_attribute].unique()) if not x in self.missing_values]       
        max_indices = [(X[max_attribute] == value) for value in max_attr_values]
 

        #CHANGE 
        # potrebujeme hodnoty, u kterych chybi vybrany attribut dat do vsech podstromu....
        # a taky predat informaci o vaze
        max_indices_null = [(x in self.missing_values) for x in X[max_attribute]]
        
        #childs = {value : self.induce_tree(X[index] , y[index],lvl+1, weight=sum(index)/len(X))  for value,index in zip(max_attr_values, max_indices) }
        childs = {value : self.induce_tree(X[index | max_indices_null] , y[index | max_indices_null],lvl+1, sum(index)/len(X),used_columns+ [max_attribute])  for value,index in zip(max_attr_values, max_indices) }
        result.decisionNode = False
        result.childs = childs
                
        result.attribute = max_attribute

        return result

    def misclassification_error(self,X,y,prune_lvl=0,max_depth=-1):
        yp = self.predict(X,prune_lvl=prune_lvl,max_depth=max_depth)
        return (1 - accuracy_score(y, yp))
    
    
    # vytvor kopii stromu
    def copy(self):    
        newtree = decision_tree()
        newtree.root = self.root.copy()        
        return(newtree)

    # vyuziti funkce nodes_aux    
    def nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,prune_lvl=prune_lvl)

    def internal_nodes(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=False,add_leaves=False,prune_lvl=prune_lvl)
            
    def leaves(self,prune_lvl=0):
        return nodes_aux(self.root,leaves_only=True,prune_lvl=prune_lvl)
    
    def count_leaves(self,prune_lvl=0):
        return len(self.leaves(prune_lvl=prune_lvl))

    # krok orezavani
    def prune_step(self,X,y,lvl):

        #limitni podminka, pokud je uroven orezani 0
        #nebo jsme u korene, skoncime
        if self.root.decisionNode or self.root.prune_lvl:
            return 
        
        #najdi odhad chyby, pocet listu a seznam vnitrnich uzlu 
        est = self.misclassification_error(X,y,prune_lvl=lvl)
        leaf_count = self.count_leaves()
        nodelist = self.internal_nodes(prune_lvl=lvl)


        #projdeme vsechny uzly, a najdeme ten, ktery 
        #ma nejnizsi hodnotu alfa
        best_to_prune = None
        best_alpha  = np.Infinity

        for n in nodelist:
            n.prune_lvl = lvl
            
            pruned_est = self.misclassification_error(X,y,prune_lvl=lvl+1)
            pruned_leaf_count = self.count_leaves(prune_lvl=lvl+1)
            alpha = (pruned_est - est)/(leaf_count-pruned_leaf_count)
        
            if alpha < best_alpha:
                best_to_prune = n
                best_alpha = alpha
        
            n.prune_lvl = None
        
        # nastavime nalezeny list jako orezany
        # na teto urovni
        best_to_prune.prune_lvl = lvl
        return

    # samotne orezavani
    def prune(self,X,y,Xp,yp):
        # generate vygenerujeme sekvenci orezanych
        # rozhodovacich stromu (urovne orezani)
        
        lvl = 1
        while(not self.root.prune_lvl):
            self.prune_step(X,y,lvl)
            lvl += 1


        best_error = 1
        best_lvl = None;
        
        # najdeme ty, ktere jsou neorezane
        for plvl in range(0,lvl+1):
            error = self.misclassification_error(Xp,yp,prune_lvl=plvl)
            
            if best_error >= error:
                best_lvl = plvl
                best_error = error
                
        return best_lvl;
In [16]:
from sklearn.model_selection import train_test_split

Xt, Xp, yt, yp = train_test_split(X, y, test_size=0.2)

DT = decision_tree()
DT.missing_values.append("?")
DT.fit(Xt,yt)

for max_d in range(7):
    print(DT.misclassification_error(Xp,yp,max_depth=max_d))
0.3908045977011494
0.011494252873563204
0.011494252873563204
0.03448275862068961
0.011494252873563204
0.02298850574712641
0.02298850574712641