import pandas as pd
import numpy as np
import math
import os
os.chdir("D:/WORK/Dropbox/__ZZD__/ZZD/")
columns = ["class",
"handicapped-infants",
"water-project-cost-sharing",
"adoption-of-the-budget-resolution",
"physician-fee-freeze",
"el-salvador-aid",
"religious-groups-in-schools",
"anti-satellite-test-ban",
"aid-to-nicaraguan-contras",
"mx-missile",
"immigration",
"synfuels-corporation-cutback",
"education-spending",
"superfund-right-to-sue",
"crime",
"duty-free-exports",
"export-administration-act-south-africa"]
dataset = pd.read_csv("house-votes-84.data", names=columns, index_col=None)
#CHANGE: nemusime uz zahazovat radky s chybejicimi hodnotami.
#dataset = dataset.dropna().reset_index(drop=True)
X = dataset.drop("class", axis=1)
y = dataset["class"]
# CHANGE
#uprava, H a dalsi procedury budou brat seznam hodnot, ktere zastupuji prazdne
#hodnoty, a ty budou ignorovat ::
#
#tedy, pokud pozdeji budu potrebovat chybejici hodnoty chtit povazovat za
#specialni hodnoty (pri normalizaci kriteria splitu), poslu prazdny seznam.
def H(y,na_values=[]):
attr_vals = [val for val in y.unique() if val not in na_values]
y = list(y)
y_counts = [y.count(val) for val in attr_vals]
total = sum(y_counts)
result = 0
for count in y_counts:
if count == 0: continue
p = count/total
result -= p * math.log2(p)
return result
def reduction_of_impurity(y,y_splits,impurity=H,na_values=[]):
result = impurity(y,na_values)
total = len(y)
for y_split in y_splits:
result -= len(y_split)/total * impurity(y_split,na_values)
return result
def split_by_values(X,y,col,na_values=[]):
attr_values = [x for x in X[col].unique() if x not in na_values]
indices = [(X[col] == value) for value in attr_values]
return [y[index] for index in indices]
from sklearn.metrics import accuracy_score
class node:
def __init__(self):
# attribut, podle ktereho se rozhodujeme
self.attribute = None
self.decisionNode = False
# priznak, jestli je to rozhodovaci
# uzel (list)
self.decision = None
self.prune_lvl = None
# slovnik:
self.childs = {}
self.parent = None
self.decisionValues = None
# weight
self.weight = 1
def copy(self):
# vytvori kopii uzlu
newnode = node()
newnode.attribute = self.attribute
newnode.decision = self.decision
newnode.decisionNode = self.decisionNode
newnode.parent = self.parent
newnode.decisionValues = self.decisionValues
if not newnode.decisionNode:
newnode.childs = {v : c.copy() for v,c in self.childs.items()}
newnode.size = self.size
return(newnode)
def nodes_aux(node,leaves_only=False,add_leaves=True,prune_lvl=0):
# pomocna funkce (linearizace stromu)
# vytvori seznam vsech uzlu, nebo jen listu, nebo vsech vnitrnich uzlu
# v zavislosti na argumentech leaves_only,a add_leaves
# bere v uvahu prune_level
if node.decisionNode or (node.prune_lvl and node.prune_lvl < prune_lvl):
if add_leaves:
return([node])
else:
return([])
else:
res = []
if not leaves_only:
res.append(node)
for c in node.childs.values():
res.extend(nodes_aux(c,leaves_only=leaves_only,add_leaves=add_leaves,prune_lvl=prune_lvl))
return(res)
class decision_tree:
def __init__(self):
self.root = None
#pridan seznam hodnot, ktery ma inducer povazovat za
#reprezentaci chybejici hodnoty
self.missing_values = [pd.np.nan]
def stopping_criterion(self,X,y,lvl):
#X -- data v uzlu
#y -- jejich tridy
#lvl -- v jake jsme hloubce
if lvl > 10:
return True
# nasledujici uprava je spise jen pro jistotu
# cilovy attribut by nemel mit chybejici hodnoty
target_values = [x for x in y.unique() if not x in self.missing_values]
return(len(target_values) <= 1)
def splitting_criterion(self,y,y_splits,na_values):
return reduction_of_impurity(y,y_splits,na_values=na_values)
def predict(self,X,prune_lvl=0,max_depth=-1):
decisions = []
for _, row in X.iterrows():
depth = 0
#sestupujeme z korene k rozhodovacimu uzlu
actualNode = self.root
while (not (actualNode.decisionNode
or
(actualNode.prune_lvl and actualNode.prune_lvl < prune_lvl)
or
(depth == max_depth)
)):
value = row[actualNode.attribute]
nextnode = actualNode.childs.get(value)
#pokud odpovidajici podstrom neexistuje
#pouzijeme decision z aktualniho uzlu
if nextnode == None:
decisions.append(actualNode.decision)
break;
depth = depth + 1
actualNode = nextnode;
else:
decisions.append(actualNode.decision)
return pd.Series(decisions,dtype="category")
def fit(self,X,y):
self.root = self.induce_tree(X,y,0)
return self
#CHANGE
#pridana vaha (ovsem pri predict se jeste nepouziva)
#pridan seznam pouzitych sloupcu, aby se zamezilo cykleni
def induce_tree(self,X,y,lvl=0,weight=1,used_columns=[]):
result = node()
result.decisionValues = y.value_counts()
#CHANGE
#nastavime predanou vahu
result.weight = weight
result.decision = result.decisionValues.index[0]
# pokud nastalo kriterium zastaveni
# ukoncime indukci (vratime listovy uzel)
if self.stopping_criterion(X,y,lvl):
result.decisionNode = True
return result;
# hledame maximalni redukci necistoty
information_gains = []
for col in X.columns:
# pripravime splity a
# vypocteme kriterium splitu
w = len([val for val in X[col] if val not in self.missing_values]) / len(X[col])
information_gains.append((col, w * self.splitting_criterion(y,split_by_values(X,y,col,self.missing_values),self.missing_values)))
meanIG = np.mean( [ig for _,ig in information_gains])
max_crit = -1
max_attribute = None
for col,ig in information_gains:
if ig > max_crit:
max_crit = ig
max_attribute = col
# OK, mame nejlepsi attribut na split
# pokud ani tento attribut nesnizuje necistotu (kriterium je 0)
# ukoncime indukci (vytvorime listovy uzel)
if max_crit <= 0.1:
result.decisionNode = True
return result;
# vygenerujeme potomky
max_attr_values = [x for x in (X[max_attribute].unique()) if not x in self.missing_values]
max_indices = [(X[max_attribute] == value) for value in max_attr_values]
#CHANGE
# potrebujeme hodnoty, u kterych chybi vybrany attribut dat do vsech podstromu....
# a taky predat informaci o vaze
max_indices_null = [(x in self.missing_values) for x in X[max_attribute]]
#childs = {value : self.induce_tree(X[index] , y[index],lvl+1, weight=sum(index)/len(X)) for value,index in zip(max_attr_values, max_indices) }
childs = {value : self.induce_tree(X[index | max_indices_null] , y[index | max_indices_null],lvl+1, sum(index)/len(X),used_columns+ [max_attribute]) for value,index in zip(max_attr_values, max_indices) }
result.decisionNode = False
result.childs = childs
result.attribute = max_attribute
return result
def misclassification_error(self,X,y,prune_lvl=0,max_depth=-1):
yp = self.predict(X,prune_lvl=prune_lvl,max_depth=max_depth)
return (1 - accuracy_score(y, yp))
# vytvor kopii stromu
def copy(self):
newtree = decision_tree()
newtree.root = self.root.copy()
return(newtree)
# vyuziti funkce nodes_aux
def nodes(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=False,prune_lvl=prune_lvl)
def internal_nodes(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=False,add_leaves=False,prune_lvl=prune_lvl)
def leaves(self,prune_lvl=0):
return nodes_aux(self.root,leaves_only=True,prune_lvl=prune_lvl)
def count_leaves(self,prune_lvl=0):
return len(self.leaves(prune_lvl=prune_lvl))
# krok orezavani
def prune_step(self,X,y,lvl):
#limitni podminka, pokud je uroven orezani 0
#nebo jsme u korene, skoncime
if self.root.decisionNode or self.root.prune_lvl:
return
#najdi odhad chyby, pocet listu a seznam vnitrnich uzlu
est = self.misclassification_error(X,y,prune_lvl=lvl)
leaf_count = self.count_leaves()
nodelist = self.internal_nodes(prune_lvl=lvl)
#projdeme vsechny uzly, a najdeme ten, ktery
#ma nejnizsi hodnotu alfa
best_to_prune = None
best_alpha = np.Infinity
for n in nodelist:
n.prune_lvl = lvl
pruned_est = self.misclassification_error(X,y,prune_lvl=lvl+1)
pruned_leaf_count = self.count_leaves(prune_lvl=lvl+1)
alpha = (pruned_est - est)/(leaf_count-pruned_leaf_count)
if alpha < best_alpha:
best_to_prune = n
best_alpha = alpha
n.prune_lvl = None
# nastavime nalezeny list jako orezany
# na teto urovni
best_to_prune.prune_lvl = lvl
return
# samotne orezavani
def prune(self,X,y,Xp,yp):
# generate vygenerujeme sekvenci orezanych
# rozhodovacich stromu (urovne orezani)
lvl = 1
while(not self.root.prune_lvl):
self.prune_step(X,y,lvl)
lvl += 1
best_error = 1
best_lvl = None;
# najdeme ty, ktere jsou neorezane
for plvl in range(0,lvl+1):
error = self.misclassification_error(Xp,yp,prune_lvl=plvl)
if best_error >= error:
best_lvl = plvl
best_error = error
return best_lvl;
from sklearn.model_selection import train_test_split
Xt, Xp, yt, yp = train_test_split(X, y, test_size=0.2)
DT = decision_tree()
DT.missing_values.append("?")
DT.fit(Xt,yt)
for max_d in range(7):
print(DT.misclassification_error(Xp,yp,max_depth=max_d))