Rozhodovací stromy (decision trees)

In [1]:
import pandas as pd
import numpy as np
import math
In [2]:
import os

os.chdir("d:/WORK/Dropbox/__ZZD__/ZZD/")

Načtení datasetu mushroom (UCI ML Repository)

In [3]:
columns = ["edible", "cap-shape", "cap-surface", "cap-color", "bruises?",
        "odor", "gill-attachment", "gill-spacing", "gill-size", "gill-color",
        "stalk-shape", "stalk-root", "stalk-surface-above-ring",
        "stalk-surface-below-ring", "stalk-color-above-ring",
        "stalk-color-below-ring", "veil-type", "veil-color", "ring-number",
        "ring-type", "spore-print-color", "population", "habitat"
        ]
dataset = pd.read_csv("mushroom.data",
                      names=columns, index_col=None)

X = dataset.drop("edible", axis=1)
y = dataset["edible"]
In [4]:
X.head()
Out[4]:
cap-shape cap-surface cap-color bruises? odor gill-attachment gill-spacing gill-size gill-color stalk-shape ... stalk-surface-below-ring stalk-color-above-ring stalk-color-below-ring veil-type veil-color ring-number ring-type spore-print-color population habitat
0 x s n t p f c n k e ... s w w p w o p k s u
1 x s y t a f c b k e ... s w w p w o p n n g
2 b s w t l f c b n e ... s w w p w o p n n m
3 x y w t p f c n n e ... s w w p w o p k s u
4 x s g f n f w b k t ... s w w p w o e n a g

5 rows × 22 columns

Budeme potřebovat zjistit unikatní hodnoty ve sloupci:

In [5]:
col = "cap-shape"
attr_values = X[col].unique()
attr_values
Out[5]:
array(['x', 'b', 's', 'f', 'k', 'c'], dtype=object)

A počty jednotlivých hodnot:

In [6]:
X[col].value_counts()
Out[6]:
x    3656
f    3152
k     828
b     452
s      32
c       4
Name: cap-shape, dtype: int64

Meření nečistoty (prozatím entropie)

Meření nejistoty budeme používat jako kritérium, podle kterého budeme vybírat attribut, dle kterého rozdělíme data.

In [7]:
def H(y):
    total = len(y)    
    result = 0
    for count in y.value_counts():
        if count == 0: continue
        p = count/total
        result -= p * math.log2(p)
    return result

H(y)
Out[7]:
0.9990678968724604

Přesněji, budeme vybírat ten attribut, který zajistí největší rozdíl v nečistotě

In [8]:
def reduction_of_impurity(y,y_splits,impurity=H):
    result = impurity(y)
    total = len(y)
    for y_split in y_splits:
        result -= len(y_split)/total * impurity(y_split)
    return result

Zamýšlené použití:

In [9]:
attr_values = X[col].unique()
indices = [(X[col] == value) for value in attr_values]
reduction_of_impurity(y,[y[index] for index in indices])
Out[9]:
0.04879670193537308

můžeme to zabalit do funkce:

In [10]:
def split_by_values(X,y,col):
    attr_values = X[col].unique()
    indices = [(X[col] == value) for value in attr_values]
    return [y[index] for index in indices]

reduction_of_impurity(y,split_by_values(X,y,col))
Out[10]:
0.04879670193537308

Uzel rozhodovacího stromu

(vše v komentářích)

In [11]:
class node:

    def __init__(self):

        # priznak jestli jde o interni uzel [False] 
        # nebo o listovy (rozhodovaci) uzel [True]
        self.decisionNode = False

        # attribut, podle ktereho se rozhodujeme
        # internim uzlu
        self.attribute = None
        
        # cilovy attribut -- budeme jej uvazovat 
        # nejen v 
        self.decision = None
        
        # pocty pripadu z trenovacich dat
        # ktere byly pouzity pri indukci tohoho
        # uzlu
        self.decisionValues = None
        
        # potomci a predek
        self.childs = {}
        self.parent = None

Základní verze rozhodovacího stromu

kritérium zastavení

necháme volné (zastavíme, až to bude nutné)

kritérium splitu

použijeme měrení nečistoty uvedené výše

predikce

sestupujeme od korene k rozhodovacímu uzlu

In [12]:
class decision_tree:
    
    def __init__(self): 
        self.root = None
        
    def stopping_criterion(self,X,y,lvl):
        #X   -- data v uzlu
        #y   -- jejich tridy
        #lvl -- v jake jsme hloubce
        return(y.value_counts().size == 1)
        
    def splitting_criterion(self,y,y_splits):
        return reduction_of_impurity(y,y_splits)

    def predict(self,X):
        decisions = []
        for  _, row in X.iterrows():
            
            #sestupujeme z korene k rozhodovacimu uzlu
            actualNode = self.root
            while (not actualNode.decisionNode):
                value = row[actualNode.attribute]
                nextnode = actualNode.childs.get(value)
                
                #pokud odpovidajici podstrom neexistuje
                #pouzijeme decision z 
                if nextnode == None:
                    decisions.append(actualNode.decision)
                    break;
                    
                actualNode = nextnode;
            else:    
                decisions.append(actualNode.decision)        
        return pd.Series(decisions,dtype="category")
    
    def fit(self,X,y):
        self.root = self.induce_tree(X,y,0)
        return self
    
    def induce_tree(self,X,y,lvl=0):

        result = node()
        result.decisionValues = y.value_counts()
        result.decision = result.decisionValues.index[0]


        # pokud nastalo kriterium zastaveni
        # ukoncime indukci (vratime listovy uzel)
        if self.stopping_criterion(X,y,lvl):
            result.decisionNode = True
            return result;

        
        # jinak hledame atribut s max. kriteriem splitu
        max_crit = -1
        max_attribute = None

        for col in X.columns:

            # pripravime splity a
            # vypocteme kriterium splitu
            crit = self.splitting_criterion(y,split_by_values(X,y,col))

            if crit > max_crit:
                max_crit = crit
                max_attribute = col


        # OK, mame nejlepsi attribut na split
        # pokud ani tento attribut nesnizuje necistotu (kriterium je 0)
        # ukoncime indukci (vytvorime listovy uzel)
        if max_crit <= 0.0000001:
            result.decisionNode = True
            return result;


        # vygenerujeme potomky 
        max_attr_values = X[max_attribute].unique()
        max_indices = [(X[max_attribute] == value) for value in max_attr_values]
        
        

        childs = {value : self.induce_tree(X[index] , y[index],lvl+1)  for value,index in zip(max_attr_values, max_indices) }
        result.decisionNode = False
        result.childs = childs
        result.attribute = max_attribute

        return result

Můžeme testnout:

In [13]:
DT = decision_tree()    
DT.fit(X,y)
DT.predict(X)
Out[13]:
0       p
1       e
2       e
3       p
4       e
5       e
6       e
7       e
8       p
9       e
10      e
11      e
12      e
13      p
14      e
15      e
16      e
17      p
18      p
19      p
20      e
21      p
22      e
23      e
24      e
25      p
26      e
27      e
28      e
29      e
       ..
8094    e
8095    p
8096    e
8097    p
8098    p
8099    e
8100    e
8101    p
8102    e
8103    e
8104    e
8105    e
8106    e
8107    e
8108    p
8109    e
8110    e
8111    e
8112    e
8113    p
8114    p
8115    e
8116    p
8117    p
8118    p
8119    e
8120    e
8121    e
8122    p
8123    e
Length: 8124, dtype: category
Categories (2, object): [e, p]

a vyhodnotit s pomocí k-složkové křížové validace:

In [14]:
# k-fold cross-validation
# k = 10

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score

k = 10
kf = StratifiedKFold(n_splits=k, shuffle=True)


acc = 0

for train_index, test_index in kf.split(X,y):
    X_train, X_test = X.iloc[train_index,:], X.iloc[test_index,:]
    y_train, y_test = y.iloc[train_index], y[test_index]


    DT.fit(X_train,y_train)    
    y_pred = DT.predict(X_test)

    acc += accuracy_score(y_test, y_pred)
    
print(acc/k)
1.0