### ### Bayes Filter Demo ### ~~~~~~~~~~~~~~~~~ ### ### (C) 2009 Rodrigo Ventura, ISR / IST ### Email: rodrigo.ventura@isr.ist.utl.pt ### import sys import math import random import Tkinter def p_normal(x, u, sigma2): return math.exp(-(x-u)**2/(2*sigma2)) / math.sqrt(2*math.pi*sigma2) def lo(p): return math.log(p/(1.0-p)) def plo(l): return 1 - 1/(1+math.exp(l)) class Viewer(Tkinter.Tk): VERSION = "1.3 (18-Oct-2009)" ### <<< VERSION STRING N = 20 m, g = None, None default_m = "Particle" ## Interface functions def __init__(self): Tkinter.Tk.__init__(self) self.title("Bayes filter demo "+self.VERSION) Tkinter.Label(self, text="(C) 2009 Rodrigo Ventura, ISR / IST", font="verdana 9").pack(side="bottom", expand=True, anchor="sw") self.canvas = c = Tkinter.Canvas(self, width=120+25*self.N, height=310) c.pack(side="bottom") self.var_loc = Tkinter.IntVar() self.var_map = Tkinter.IntVar() Tkinter.Button(self, text="Init", command=self.init).pack(side="left") Tkinter.Button(self, text="<", command=self.do_left).pack(side="left") Tkinter.Button(self, text="=", command=self.do_stay).pack(side="left") Tkinter.Button(self, text=">", command=self.do_right).pack(side="left") # filter menu mb = Tkinter.Menubutton(self) m = Tkinter.Menu(mb) for name in ("Discrete", "Kalman", "Particle"): m.add_command(label=name, command=lambda n=name: self.set_filter(n)) mb.configure(menu=m) mb.pack(side="left") self.mb = mb self.set_filter(self.default_m) # Tkinter.Checkbutton(self, text="Localization", variable=self.var_loc).pack(side="left") Tkinter.Checkbutton(self, text="Mapping", variable=self.var_map,\ command=self.set_map).pack(side="left") Tkinter.Button(self, text="About", command=self.do_about).pack(side="right") c.bind("", self.mouse_press) # map slots for i in xrange(self.N): self.canvas.create_line(50+i*25, 200, 70+i*25, 200) self.g = OccGrid(self) def set_filter(self, name): if self.m: self.m.cleanup() if name=="Discrete": self.m = Discrete(self) elif name=="Kalman": self.m = Kalman(self) elif name=="Particle": self.m = Particle(self) self.mb.configure(text=name) self.m.init() self.m.update_pdf() def update_robot(self): self.canvas.delete("robot") self.canvas.create_oval(50+self.x*25, 190, 70+self.x*25, 170, tags="robot") def show_measure(self, z): self.canvas.delete("meas") if z: self.canvas.create_text(50, 210, text="z = %.2f"%z, \ fill="blue", anchor="nw", tags="meas") u = 60 + z*25 self.canvas.create_line(u, 150, u, 160, fill="blue", tags="meas") def do_left(self): if self.x>0: if random.random()>0.2: self.x -= 1 self.do_step(-1) def do_stay(self): self.do_step(0) def do_right(self): if self.x0.2: self.x += 1 self.do_step(1) def do_step(self, u): self.update_robot() # localization z = self.measure() if self.var_loc.get() else None self.show_measure(z) self.m.do_filter(u, z) self.m.update_pdf() # mapping if self.var_map.get(): z1,z2 = self.measure(), self.N-1 - self.measure() self.g.do_filter(self.x, z1, z2) self.g.show_map(z1, z2) def set_map(self): if self.var_map.get(): self.g.init() self.g.show_map() else: self.g.cleanup() def mouse_press(self, ev): x, y = ev.x, ev.y if y>=170 and y<=190: i = (x-50)//25 if i>=0 and i\nISR / IST, Lisbon, Portugal").pack(expand=True) ## Core functions def init(self): self.m.init() self.m.update_pdf() self.x = random.randint(0, self.N-1) self.update_robot() self.show_measure(None) if self.var_map.get(): self.g.init() self.g.show_map() def measure(self): """Simulate a measurment""" return random.gauss(self.x, 1) def main(self): self.init() self.mainloop() class Discrete(): ## Interface functions def __init__(self, viewer): self.v = viewer self.N = viewer.N self.v.canvas.create_line(40, 120, 40, 20, fill="#888", arrow="last", tags="axis") def update_pdf(self): self.v.canvas.delete("pdf") for i in xrange(self.N): self.v.canvas.create_rectangle(50+i*25, 120, 70+i*25, 120-100*self.px[i], \ fill="#888", tags="pdf") # draw mean m = 0 for i in xrange(self.N): m += i*self.px[i] u = 50 + (m+0.5)*25 self.v.canvas.create_line(u, 130, u, 140, fill="red", tags="pdf") def cleanup(self): self.v.canvas.delete("pdf", "axis") ## Core functions def init(self): self.px = self.N*[1.0/self.N] def do_filter(self, u, z): """Discrete filter algorithm: given control u and measurment z, updates px distribution""" bel1 = self.N*[0] for j in xrange(self.N): for i in xrange(self.N): bel1[j] += self.p_trans(i, u, j)*self.px[i] if z: bel2 = [] for j in xrange(self.N): bel2.append(self.p_meas(z, j)*bel1[j]) else: bel2 = bel1 eta = 1.0/sum(bel2) self.px = map(lambda p: eta*p, bel2) def p_trans(self, x1, a, x2): """State transition model""" if a==0: return 1 if x1==x2 else 0 else: y = x1+a if y<0 or y>=self.N: return 1 if x1==x2 else 0 else: if x2==y: return 0.8 elif x2==x1: return 0.2 else: return 0 def p_meas(self, z, x): """Measurment model""" sigma2 = 1 return p_normal(z, x, sigma2) class Kalman(): ## Interface functions def __init__(self, viewer): self.v = viewer self.N = viewer.N # plot axis arrows self.v.canvas.create_line(60, 120, 60+25*self.N, 120, fill="#888", arrow="last", tags="axis") self.v.canvas.create_line(60, 120, 60, 20, fill="#888", arrow="last", tags="axis") def update_pdf(self): self.v.canvas.delete("pdf") pts = [] for u in xrange(25*self.N): x = u/25.0 px = p_normal(x, self.ux, self.sx) pts.extend([60+u,120-100*px]) self.v.canvas.create_line(pts, tags="pdf") # draw mean v = 60 + self.ux*25 self.v.canvas.create_line(v, 130, v, 140, fill="red", tags="pdf") def cleanup(self): self.v.canvas.delete("pdf", "axis") ## Core functions def init(self): self.ux = self.N/2.0 self.sx = 2.0*self.N def do_filter(self, cmd, z): """Kalman filter algorithm: given control u and measurment z, updates px distribution""" sigma2_state = 0.16 sigma2_meas = 1.0 u = 0.8*cmd # scaling to get state transition mean value if u==0: u1, s1 = self.ux, self.sx else: # (1) predict step u1 = self.ux + u s1 = self.sx + sigma2_state if z is None: u2, s2 = u1, s1 else: # (2) update step k = s1/(s1+sigma2_meas) u2 = u1 + k*(z-u1) s2 = (1-k)*s1 self.ux, self.sx = u2, s2 class Particle(): M = 100 ## Interface functions def __init__(self, viewer): self.v = viewer self.N = viewer.N # plot axis arrows self.v.canvas.create_line(60, 100, 60+25*self.N, 100, fill="#888", arrow="last", tags="axis") self.v.canvas.create_line(60, 120, 60+25*self.N, 120, fill="#888", arrow="last", tags="axis") self.v.canvas.create_line(60, 100, 60, 20, fill="#888", arrow="last", tags="axis") def update_pdf(self): self.v.canvas.delete("pdf") for (w,x) in self.ox: u = 60+25*x self.v.canvas.create_line(u, 90, u, 90-80*w, fill="#888", tags="pdf") for (w,x) in self.x: u = 60+25*x self.v.canvas.create_line(u, 115, u, 110, tags="pdf") # draw mean m = sum(map(lambda xi: xi[1], self.x)) / len(self.x) u = 50 + (m+0.5)*25 self.v.canvas.create_line(u, 130, u, 140, fill="red", tags="pdf") def cleanup(self): self.v.canvas.delete("pdf", "axis") ## Core functions def init(self): x = [] for i in xrange(self.M): x.append( (1.0, random.uniform(0,self.N-1)) ) self.ox, self.x = [], x def do_filter(self, cmd, z): """Kalman filter algorithm: given control u and measurment z, updates px distribution""" # (1) predict step ox = map(lambda xi: self.predict(xi, cmd), self.x) if z: # (2) update step ox = map(lambda xi: self.update(xi, z), ox) # (3) resample x = self.resample(ox) else: x = ox self.ox, self.x = ox, x def predict(self, xi, u): sigma2_state = 0.16 if u==0: return xi else: (w,x) = xi u *= 0.8 # scaling to get state transition mean value return (w, random.gauss(x+u, sigma2_state)) def update(self, xi, z): sigma2_meas = 1.0 (w,x) = xi w *= p_normal(z, x, sigma2_meas) return (w,x) def resample(self, x): U = 10 s = sum( map(lambda xi: xi[0], x) ) x = map( lambda xi: (xi[0]/s, xi[1]), x ) n = [] for i in xrange(self.M-U): r = random.random() s = 0 for (wi,xi) in x: s += wi if s>=r: c = xi break n.append( (1.0, c) ) for i in xrange(U): n.append( (1.0, random.uniform(0,self.N-1)) ) return n class OccGrid(): def __init__(self, viewer): self.v = viewer self.N = viewer.N self.M = 5*viewer.N + 5*2 + 5*1 self.O = 5*2 #self.v.canvas.create_line(60-25*2, 220, 60+25*(self.N+1), 220) def init(self): self.lx = self.M*[0] def cleanup(self): self.v.canvas.delete("map") def show_map(self, z1=None, z2=None): self.v.canvas.delete("map") for i in xrange(self.M): x = 60 + 5*(i-self.O) p = plo(self.lx[i]) v = 255 - round(255*max(0,min(1,p))) c = "#%02x%02x%02x"%(v,v,v) self.v.canvas.create_rectangle(x, 245, x+5, 250, \ width=0, fill=c, tags="map") if z1: self.v.canvas.create_line(60+25*(self.v.x-z1), 240, \ 60+25*self.v.x-5, 240, \ dash=".", fill="red", tags="map") if z2: self.v.canvas.create_line(60+25*(self.v.x+z2), 240, \ 60+25*self.v.x+5, 240, \ dash=".", fill="red", tags="map") def do_filter(self, x, z1, z2): for i in xrange(self.M): m = 0.2*(i-self.O) if m