merged python samles

This commit is contained in:
Marina Kolpakova 2012-06-28 17:33:11 +00:00
parent 54ee92e3b0
commit ab69f5e091
3 changed files with 324 additions and 62 deletions

View File

@ -1,78 +1,130 @@
''' '''
Neural network digit recognition sample. SVN and KNearest digit recognition.
Sample loads a dataset of handwritten digits from 'digits.png'.
Then it trains a SVN and KNearest classifiers on it and evaluates
their accuracy. Moment-based image deskew is used to improve
the recognition accuracy.
Usage: Usage:
digits.py digits.py
Sample loads a dataset of handwritten digits from 'digits.png'.
Then it trains a neural network classifier on it and evaluates
its classification accuracy.
''' '''
import numpy as np import numpy as np
import cv2 import cv2
from common import mosaic from multiprocessing.pool import ThreadPool
from common import clock, mosaic
def unroll_responses(responses, class_n):
'''[1, 0, 2, ...] -> [[0, 1, 0], [1, 0, 0], [0, 0, 1], ...]'''
sample_n = len(responses)
new_responses = np.zeros((sample_n, class_n), np.float32)
new_responses[np.arange(sample_n), responses] = 1
return new_responses
SZ = 20 # size of each digit is SZ x SZ SZ = 20 # size of each digit is SZ x SZ
CLASS_N = 10 CLASS_N = 10
digits_img = cv2.imread('digits.png', 0)
# prepare dataset def load_digits(fn):
h, w = digits_img.shape print 'loading "%s" ...' % fn
digits = [np.hsplit(row, w/SZ) for row in np.vsplit(digits_img, h/SZ)] digits_img = cv2.imread(fn, 0)
digits = np.float32(digits).reshape(-1, SZ*SZ) h, w = digits_img.shape
N = len(digits) digits = [np.hsplit(row, w/SZ) for row in np.vsplit(digits_img, h/SZ)]
labels = np.repeat(np.arange(CLASS_N), N/CLASS_N) digits = np.array(digits).reshape(-1, SZ, SZ)
labels = np.repeat(np.arange(CLASS_N), len(digits)/CLASS_N)
return digits, labels
# split it onto train and test subsets def deskew(img):
shuffle = np.random.permutation(N) m = cv2.moments(img)
train_n = int(0.9*N) if abs(m['mu02']) < 1e-2:
digits_train, digits_test = np.split(digits[shuffle], [train_n]) return img.copy()
labels_train, labels_test = np.split(labels[shuffle], [train_n]) skew = m['mu11']/m['mu02']
M = np.float32([[1, skew, -0.5*SZ*skew], [0, 1, 0]])
img = cv2.warpAffine(img, M, (SZ, SZ), flags=cv2.WARP_INVERSE_MAP | cv2.INTER_LINEAR)
return img
# train model class StatModel(object):
model = cv2.ANN_MLP() def load(self, fn):
layer_sizes = np.int32([SZ*SZ, 25, CLASS_N]) self.model.load(fn)
model.create(layer_sizes) def save(self, fn):
params = dict( term_crit = (cv2.TERM_CRITERIA_COUNT, 100, 0.01), self.model.save(fn)
train_method = cv2.ANN_MLP_TRAIN_PARAMS_BACKPROP,
bp_dw_scale = 0.001,
bp_moment_scale = 0.0 )
print 'training...'
labels_train_unrolled = unroll_responses(labels_train, CLASS_N)
model.train(digits_train, labels_train_unrolled, None, params=params)
model.save('dig_nn.dat')
model.load('dig_nn.dat')
def evaluate(model, samples, labels): class KNearest(StatModel):
'''Evaluates classifier preformance on a given labeled samples set.''' def __init__(self, k = 3):
ret, resp = model.predict(samples) self.k = k
resp = resp.argmax(-1) self.model = cv2.KNearest()
error_mask = (resp == labels)
accuracy = error_mask.mean()
return accuracy, error_mask
# evaluate model def train(self, samples, responses):
train_accuracy, _ = evaluate(model, digits_train, labels_train) self.model = cv2.KNearest()
print 'train accuracy: ', train_accuracy self.model.train(samples, responses)
test_accuracy, test_error_mask = evaluate(model, digits_test, labels_test)
print 'test accuracy: ', test_accuracy
# visualize test results def predict(self, samples):
vis = [] retval, results, neigh_resp, dists = self.model.find_nearest(samples, self.k)
for img, flag in zip(digits_test, test_error_mask): return results.ravel()
img = np.uint8(img).reshape(SZ, SZ)
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) class SVM(StatModel):
if not flag: def __init__(self, C = 1, gamma = 0.5):
img[...,:2] = 0 self.params = dict( kernel_type = cv2.SVM_RBF,
vis.append(img) svm_type = cv2.SVM_C_SVC,
vis = mosaic(25, vis) C = C,
cv2.imshow('test', vis) gamma = gamma )
cv2.waitKey() self.model = cv2.SVM()
def train(self, samples, responses):
self.model = cv2.SVM()
self.model.train(samples, responses, params = self.params)
def predict(self, samples):
return self.model.predict_all(samples).ravel()
def evaluate_model(model, digits, samples, labels):
resp = model.predict(samples)
err = (labels != resp).mean()
print 'error: %.2f %%' % (err*100)
confusion = np.zeros((10, 10), np.int32)
for i, j in zip(labels, resp):
confusion[i, j] += 1
print 'confusion matrix:'
print confusion
print
vis = []
for img, flag in zip(digits, resp == labels):
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
if not flag:
img[...,:2] = 0
vis.append(img)
return mosaic(25, vis)
if __name__ == '__main__':
print __doc__
digits, labels = load_digits('digits.png')
print 'preprocessing...'
# shuffle digits
rand = np.random.RandomState(12345)
shuffle = rand.permutation(len(digits))
digits, labels = digits[shuffle], labels[shuffle]
digits2 = map(deskew, digits)
samples = np.float32(digits2).reshape(-1, SZ*SZ) / 255.0
train_n = int(0.9*len(samples))
cv2.imshow('test set', mosaic(25, digits[train_n:]))
digits_train, digits_test = np.split(digits2, [train_n])
samples_train, samples_test = np.split(samples, [train_n])
labels_train, labels_test = np.split(labels, [train_n])
print 'training KNearest...'
model = KNearest(k=1)
model.train(samples_train, labels_train)
vis = evaluate_model(model, digits_test, samples_test, labels_test)
cv2.imshow('KNearest test', vis)
print 'training SVM...'
model = SVM(C=4.66, gamma=0.08)
model.train(samples_train, labels_train)
vis = evaluate_model(model, digits_test, samples_test, labels_test)
cv2.imshow('SVM test', vis)
print 'saving SVM as "digits_svm.dat"...'
model.save('digits_svm.dat')
cv2.waitKey(0)

View File

@ -0,0 +1,136 @@
'''
Digit recognition adjustment.
Grid search is used to find the best parameters for SVN and KNearest classifiers.
SVM adjustment follows the guidelines given in
http://www.csie.ntu.edu.tw/~cjlin/papers/guide/guide.pdf
Threading or cloud computing (with http://www.picloud.com/)) may be used
to speedup the computation.
Usage:
digits_adjust.py [--model {svm|knearest}] [--cloud] [--env <PiCloud environment>]
--model {svm|knearest} - select the classifier (SVM is the default)
--cloud - use PiCloud computing platform (for SVM only)
--env - cloud environment name
'''
# TODO dataset preprocessing in cloud
# TODO cloud env setup tutorial
import numpy as np
import cv2
from multiprocessing.pool import ThreadPool
from digits import *
def cross_validate(model_class, params, samples, labels, kfold = 3, pool = None):
n = len(samples)
folds = np.array_split(np.arange(n), kfold)
def f(i):
model = model_class(**params)
test_idx = folds[i]
train_idx = list(folds)
train_idx.pop(i)
train_idx = np.hstack(train_idx)
train_samples, train_labels = samples[train_idx], labels[train_idx]
test_samples, test_labels = samples[test_idx], labels[test_idx]
model.train(train_samples, train_labels)
resp = model.predict(test_samples)
score = (resp != test_labels).mean()
print ".",
return score
if pool is None:
scores = map(f, xrange(kfold))
else:
scores = pool.map(f, xrange(kfold))
return np.mean(scores)
def adjust_KNearest(samples, labels):
print 'adjusting KNearest ...'
best_err, best_k = np.inf, -1
for k in xrange(1, 9):
err = cross_validate(KNearest, dict(k=k), samples, labels)
if err < best_err:
best_err, best_k = err, k
print 'k = %d, error: %.2f %%' % (k, err*100)
best_params = dict(k=best_k)
print 'best params:', best_params
return best_params
def adjust_SVM(samples, labels, usecloud=False, cloud_env=''):
Cs = np.logspace(0, 5, 10, base=2)
gammas = np.logspace(-7, -2, 10, base=2)
scores = np.zeros((len(Cs), len(gammas)))
scores[:] = np.nan
if usecloud:
try:
import cloud
except ImportError:
print 'cloud module is not installed'
usecloud = False
if usecloud:
print 'uploading dataset to cloud...'
np.savez('train.npz', samples=samples, labels=labels)
cloud.files.put('train.npz')
print 'adjusting SVM (may take a long time) ...'
def f(job):
i, j = job
params = dict(C = Cs[i], gamma=gammas[j])
score = cross_validate(SVM, params, samples, labels)
return i, j, score
def fcloud(job):
i, j = job
cloud.files.get('train.npz')
npz = np.load('train.npz')
params = dict(C = Cs[i], gamma=gammas[j])
score = cross_validate(SVM, params, npz['samples'], npz['labels'])
return i, j, score
if usecloud:
jids = cloud.map(fcloud, np.ndindex(*scores.shape), _env=cloud_env, _profile=True)
ires = cloud.iresult(jids)
else:
pool = ThreadPool(processes=cv2.getNumberOfCPUs())
ires = pool.imap_unordered(f, np.ndindex(*scores.shape))
for count, (i, j, score) in enumerate(ires):
scores[i, j] = score
print '%d / %d (best error: %.2f %%, last: %.2f %%)' % (count+1, scores.size, np.nanmin(scores)*100, score*100)
print scores
i, j = np.unravel_index(scores.argmin(), scores.shape)
best_params = dict(C = Cs[i], gamma=gammas[j])
print 'best params:', best_params
print 'best error: %.2f %%' % (scores.min()*100)
return best_params
if __name__ == '__main__':
import getopt
import sys
print __doc__
args, _ = getopt.getopt(sys.argv[1:], '', ['model=', 'cloud', 'env='])
args = dict(args)
args.setdefault('--model', 'svm')
args.setdefault('--env', '')
if args['--model'] not in ['svm', 'knearest']:
print 'unknown model "%s"' % args['--model']
sys.exit(1)
digits, labels = load_digits('digits.png')
shuffle = np.random.permutation(len(digits))
digits, labels = digits[shuffle], labels[shuffle]
digits2 = map(deskew, digits)
samples = np.float32(digits2).reshape(-1, SZ*SZ) / 255.0
t = clock()
if args['--model'] == 'knearest':
adjust_KNearest(samples, labels)
else:
adjust_SVM(samples, labels, usecloud='--cloud' in args, cloud_env = args['--env'])
print 'work time: %f s' % (clock() - t)

View File

@ -0,0 +1,74 @@
import numpy as np
import cv2
import digits
import os
import video
from common import mosaic
def main():
cap = video.create_capture()
classifier_fn = 'digits_svm.dat'
if not os.path.exists(classifier_fn):
print '"%s" not found, run digits.py first' % classifier_fn
return
model = digits.SVM()
model.load('digits_svm.dat')
SZ = 20
while True:
ret, frame = cap.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
bin = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 31, 10)
bin = cv2.medianBlur(bin, 3)
contours, heirs = cv2.findContours( bin.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
rects = map(cv2.boundingRect, contours)
valid_flags = [ 16 <= h <= 64 and w <= 1.2*h for x, y, w, h in rects]
for i, cnt in enumerate(contours):
if not valid_flags[i]:
continue
_, _, _, outer_i = heirs[0, i]
if outer_i >=0 and valid_flags[outer_i]:
continue
x, y, w, h = rects[i]
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0))
sub = bin[y:,x:][:h,:w]
#sub = ~cv2.equalizeHist(sub)
#_, sub_bin = cv2.threshold(sub, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
s = 1.5*float(h)/SZ
m = cv2.moments(sub)
m00 = m['m00']
if m00/255 < 0.1*w*h or m00/255 > 0.9*w*h:
continue
c1 = np.float32([m['m10'], m['m01']]) / m00
c0 = np.float32([SZ/2, SZ/2])
t = c1 - s*c0
A = np.zeros((2, 3), np.float32)
A[:,:2] = np.eye(2)*s
A[:,2] = t
sub1 = cv2.warpAffine(sub, A, (SZ, SZ), flags=cv2.WARP_INVERSE_MAP | cv2.INTER_LINEAR)
sub1 = digits.deskew(sub1)
if x+w+SZ < frame.shape[1] and y+SZ < frame.shape[0]:
frame[y:,x+w:][:SZ, :SZ] = sub1[...,np.newaxis]
sample = np.float32(sub1).reshape(1,SZ*SZ) / 255.0
digit = model.predict(sample)[0]
cv2.putText(frame, '%d'%digit, (x, y), cv2.FONT_HERSHEY_PLAIN, 1.0, (200, 0, 0), thickness = 1)
cv2.imshow('frame', frame)
cv2.imshow('bin', bin)
if cv2.waitKey(1) == 27:
break
if __name__ == '__main__':
main()