# -*- coding: utf-8 -*-
# pylint: disable=invalid-name, too-many-arguments, too-many-branches,
# pylint: disable=too-many-locals, too-many-instance-attributes, too-many-lines
"""
This module implements the linear Kalman filter in both an object
oriented and procedural form.
The KalmanFilter class implements the filter by storing the various matrices in instance variables, minimizing the amount of bookkeeping you have to do.
All Kalman filters operate with a predict->update cycle. The
predict step, implemented with the method or function predict(),
uses the state transition matrix F to predict the state in the next
time period (epoch). The state is stored as a gaussian (x, P), where
x is the state (column) vector, and P is its covariance. Covariance
matrix Q specifies the process covariance. In Bayesian terms, this
prediction is called the *prior*, which you can think of colloquially
as the estimate prior to incorporating the measurement.
The update step, implemented with the method or function `update()`,
incorporates the measurement z with covariance R, into the state
estimate (x, P). The class stores the system uncertainty in S,
the innovation (residual between prediction and measurement in
measurement space) in y, and the Kalman gain in k. The procedural
form returns these variables to you. In Bayesian terms this computes
the *posterior* - the estimate after the information from the
measurement is incorporated.
Whether you use the OO form or procedural form is up to you. If
matrices such as H, R, and F are changing each epoch, you'll probably
opt to use the procedural form. If they are unchanging, the OO
form is perhaps easier to use since you won't need to keep track
of these matrices. This is especially useful if you are implementing
banks of filters or comparing various KF designs for performance;
a trivial coding bug could lead to using the wrong sets of matrices.
This module also offers an implementation of the RTS smoother, and
other helper functions, such as log likelihood computations.
The Saver class allows you to easily save the state of the
KalmanFilter class after every update
This module expects NumPy arrays for all values that expect
arrays, although in a few cases, particularly method parameters,
it will accept types that convert to NumPy arrays, such as lists
of lists. These exceptions are documented in the method or function.
Examples
--------
The following example constructs a constant velocity kinematic
filter, filters noisy data, and plots the results. It also demonstrates
using the Saver class to save the state of the filter at each epoch.
.. code-block:: Python
import matplotlib.pyplot as plt
import numpy as np
from filterpy.kalman import KalmanFilter
from filterpy.common import Q_discrete_white_noise, Saver
r_std, q_std = 2., 0.003
cv = KalmanFilter(dim_x=2, dim_z=1)
cv.x = np.array([[0., 1.]]) # position, velocity
cv.F = np.array([[1, dt],[ [0, 1]])
cv.R = np.array([[r_std^^2]])
f.H = np.array([[1., 0.]])
f.P = np.diag([.1^^2, .03^^2)
f.Q = Q_discrete_white_noise(2, dt, q_std**2)
saver = Saver(cv)
for z in range(100):
cv.predict()
cv.update([z + randn() * r_std])
saver.save() # save the filter's state
saver.to_array()
plt.plot(saver.x[:, 0])
# plot all of the priors
plt.plot(saver.x_prior[:, 0])
# plot mahalanobis distance
plt.figure()
plt.plot(saver.mahalanobis)
This code implements the same filter using the procedural form
x = np.array([[0., 1.]]) # position, velocity
F = np.array([[1, dt],[ [0, 1]])
R = np.array([[r_std^^2]])
H = np.array([[1., 0.]])
P = np.diag([.1^^2, .03^^2)
Q = Q_discrete_white_noise(2, dt, q_std**2)
for z in range(100):
x, P = predict(x, P, F=F, Q=Q)
x, P = update(x, P, z=[z + randn() * r_std], R=R, H=H)
xs.append(x[0, 0])
plt.plot(xs)
For more examples see the test subdirectory, or refer to the
book cited below. In it I both teach Kalman filtering from basic
principles, and teach the use of this library in great detail.
FilterPy library.
http://github.com/rlabbe/filterpy
Documentation at:
https://filterpy.readthedocs.org
Supporting book at:
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
This is licensed under an MIT license. See the readme.MD file
for more information.
Copyright 2014-2018 Roger R Labbe Jr.
"""
from __future__ import absolute_import, division
from copy import deepcopy
from math import log, exp, sqrt
import sys
import warnings
import numpy as np
from numpy import dot, zeros, eye, isscalar, shape
import numpy.linalg as linalg
from filterpy.stats import logpdf
from filterpy.common import pretty_str, reshape_z
class KalmanFilter(object):
r""" 卡尔曼滤波器的具体实现,你需要设置相关变量为合理的数值,默认值不可以工作。
简言之,你需要首先创建该对象,明确状态变量的维度 dim_x,以及观测向量z的维度 dim_z 。
在运行过程中会有很多次矩阵形状检查,比如当你设置dim_z=2,
然后设置测量矩阵R为3x3的矩阵时,你会得到一个断言错误。
在构建滤波器后,滤波器有一些默认值。但是你需要设置每一个变量值。
这很容易,你只需要给所有的值以赋值的形式传递给每一个变量即可。
所有设置值的类型为numpy.array,下面是一个例子。
Examples
--------
这是一个滤波器,追踪位置和速度,仅使用一个传感器来获取当前的位置
首先创建一个滤波器对象并指定必要的位置
.. code::
from filterpy.kalman import KalmanFilter
f = KalmanFilter (dim_x=2, dim_z=1)
然后给每个相关矩阵设置值,下面是定义状态初始值
.. code::
f.x = np.array([[2.], # position
[0.]]) # velocity
或者仅仅采用一维向量
.. code::
f.x = np.array([2., 0.])
定义状态转移矩阵F
.. code::
f.F = np.array([[1.,1.],
[0.,1.]])
定义测量矩阵H
.. code::
f.H = np.array([[1.,0.]])
定义协方差矩阵初始值P. 此处我利用P为对角阵的特性,初始化P为np.eye(dim_x), 因此只需要乘以一个值来表示不确定性即可:
.. code::
f.P *= 1000.
也可以这样来定义:
.. code::
f.P = np.array([[1000., 0.],
[ 0., 1000.] ])
由用户来定义哪一种比较好理解以及可读性更好
现在给观测噪声赋值,此时观测噪声是一个1x1的矩阵,所以我可以用一个标量来表示。
.. code::
f.R = 5
也可以通过一下矩阵形式赋值:
.. code::
f.R = np.array([[5.]])
需要注意的是,R需要时1x1的2维矩阵,当其是矩阵形式的时候。
最后我会给过程噪声Q矩阵赋值,这里我利用了filterpy的另一个函数:
.. code::
from filterpy.common import Q_discrete_white_noise
f.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.13)
接下来我们只需要执行 predict/update 循环:
while some_condition_is_true:
.. code::
z = get_sensor_reading()
f.predict()
f.update(z)
do_something_with_estimate (f.x)
下面是面向过程式编程方式
**Procedural Form**
This module also contains stand alone functions to perform Kalman filtering.
Use these if you are not a fan of objects.
**Example**
.. code::
while True:
z, R = read_sensor()
x, P = predict(x, P, F, Q)
x, P = update(x, P, z, R, H)
See my book Kalman and Bayesian Filters in Python [2]_.
你需要设置如下的属性来构造卡尔曼滤波对象。请注意:在对象实现内部有各种各样的检查来确保你设置了正确的size。然而,仍然有可能发生一些size没有设置好的例子以至于线性代数运行不能正常计算。
也有可能没有任何动静地报错,你可以最后得到一个能够运行地矩阵结果,但矩阵的形状并不是你所需要解决的问题所设置的那样。
Parameters
----------
dim_x : int
状态向量的维度大小。比如你想跟踪一个物体的在二维空间的位置和速度,dim_x就需要设置为4.
该变量用于设置 P、Q、u矩阵的默认形状。
dim_z : int
测量输入值的个数,也可以看做测量向量维度的大小。比如,如果传感器提供位置坐标 (x,y)
那么 dim_z 将会是2.
dim_u : int (optional)
外部输入向量的维度,默认不使用,维度大小为0。
compute_log_likelihood : bool (default = True)
默认打开,用来控制是否计算 log likelihood。
Attributes
----------
x : numpy.array(dim_x, 1)
状态向量,所有关于 update() 和 predict() 的调用将会更新该变量。
P : numpy.array(dim_x, dim_x)
当前状态向量协方差矩阵,所有关于 update() 和 predict() 的调用将会更新该变量。
x_prior : numpy.array(dim_x, 1)
只读变量。x的先验估计向量,他们。
Prior (predicted) state estimate. The *_prior and *_post attributes
are for convienence; they store the prior and posterior of the
current epoch. Read Only.
P_prior : numpy.array(dim_x, dim_x)
只读,先验误差P_prior。
x_post : numpy.array(dim_x, 1)
只读,后验状态估计(即最优状态估计)
P_post : numpy.array(dim_x, dim_x)
只读,后验协方差矩阵
z : numpy.array
只读,最后一次调用update函数使用的测量值
R : numpy.array(dim_z, dim_z)
测量值噪声矩阵
Q : numpy.array(dim_x, dim_x)
过程噪声矩阵
F : numpy.array()
状态转移矩阵
H : numpy.array(dim_z, dim_x)
观测矩阵
y : numpy.array
只读,测量值与预测值之间的误差
K : numpy.array(dim_x, dim_z)
只读,更新步骤的卡尔曼增益
S : numpy.array
只读,自动不确定性衡量矩阵,P投影到测量空间下得到,即 S = H @ P @ H^T
SI : numpy.array
只读,自动不确定性衡量矩阵的逆矩阵
log_likelihood : float
只读,最后一次测量的对数似然值。
likelihood : float
只读,最后一次测量的似然值。
从 log-likelihood计算得到。 log-likelihood 可以是一个非常小的数,这意味着
log-likelihood可以是一个非常小的负数,比如-28000. exp(-28000)计算结果将会是0.0
会破坏采用该值的后续运算,因此默认情况下我们总是返回一个sys.float_info.min
mahalanobis : float
只读,新息向量的马氏距离
inv : function, default numpy.linalg.inv
求逆函数,默认为numpy.linalg.inv
如果你喜欢设置其他求逆函数,比如Moore-Penrose pseudo inverse,那么需要设置:
kf.inv = np.linalg.pinv
该函数仅用于计算 self.S的逆矩阵,如果你知道self.S是一个对角阵的话,你可能会选择
filterpy.common.inv_diagonal函数,其比numpy.linalg.inv运算效率更高。
alpha : float
衰减机制设置常量,默认为1.0,没有衰减。具体含义看参考[1]
Fading memory setting. 1.0 gives the normal Kalman filter, and
values slightly larger than 1.0 (such as 1.02) give a fading
memory effect - previous measurements have less influence on the
filter's estimates. This formulation of the Fading memory filter
(there are many) is due to Dan Simon [1]_.
参考
----------
.. [1] Dan Simon. "Optimal State Estimation." John Wiley & Sons.
p. 208-212. (2006)
.. [2] Roger Labbe. "Kalman and Bayesian Filters in Python"
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
"""
def __init__(self, dim_x, dim_z, dim_u=0):
# 控制维度大小
if dim_x < 1:
raise ValueError('dim_x must be 1 or greater')
if dim_z < 1:
raise ValueError('dim_z must be 1 or greater')
if dim_u < 0:
raise ValueError('dim_u must be 0 or greater')
# 状态向量维度
self.dim_x = dim_x
# 测量向量维度
self.dim_z = dim_z
# 输入向量维度
self.dim_u = dim_u
# 状态向量
self.x = zeros((dim_x, 1)) # state
# 不确定性协方差
self.P = eye(dim_x) # uncertainty covariance
# 过程噪声协方差
self.Q = eye(dim_x) # process uncertainty
# 控制矩阵,反映了系统输入到系统状态的映射关系
self.B = None # control transition matrix
# 状态转移矩阵
self.F = eye(dim_x) # state transition matrix
# 测量矩阵(Measurement Matrix),描述了从系统状态到测量值的转换关系
self.H = zeros((dim_z, dim_x)) # Measurement function
# 状态不确定性协方差
self.R = eye(dim_z) # state uncertainty
# 衰减机制控制因子
self._alpha_sq = 1. # fading memory control
#
self.M = np.zeros((dim_z, dim_z)) # process-measurement cross correlation
# 测量向量(不是计算得到的,而是外部传入的传感器获取的测量值)
self.z = np.array([[None]*self.dim_z]).T
# 在新息过程中计算增益和误差
# 这里设置一些变量来保存相关变量
# gain and residual are computed during the innovation step. We
# save them so that in case you want to inspect them for various
# purposes
# 卡尔曼增益
self.K = np.zeros((dim_x, dim_z)) # kalman gain
# 误差变量 y = z - dot(H, self.x)
self.y = zeros((dim_z, 1))
# 系统不确定性
self.S = np.zeros((dim_z, dim_z)) # system uncertainty
# S的逆
self.SI = np.zeros((dim_z, dim_z)) # inverse system uncertainty
# identity matrix. Do not alter this.
# 单位矩阵
self._I = np.eye(dim_x)
# these will always be a copy of x,P after predict() is called
# x的先验估计
self.x_prior = self.x.copy()
# p的先验估计
self.P_prior = self.P.copy()
# these will always be a copy of x,P after update() is called
# x的后验估计
self.x_post = self.x.copy()
# P的后验估计
self.P_post = self.P.copy()
# Only computed only if requested via property
# 对数似然值
self._log_likelihood = log(sys.float_info.min)
# 似然值
self._likelihood = sys.float_info.min
# 马氏距离
self._mahalanobis = None
# 求逆函数
self.inv = np.linalg.inv
def predict(self, u=None, B=None, F=None, Q=None):
"""
Predict next state (prior) using the Kalman filter state propagation
equations.
Parameters
----------
u : np.array
可选择输入,外部控制向量,如果不是None,那就是以Bu的输入给系统
B : np.array(dim_x, dim_z), or None
可选择输入,控制矩阵
F : np.array(dim_x, dim_x), or None
可选择输入,状态转移矩阵
Q : np.array(dim_x, dim_x), scalar, or None
可选择输入,控制噪声协方差
"""
if B is None:
B = self.B
if F is None:
F = self.F
if Q is None:
Q = self.Q
elif isscalar(Q):
Q = eye(self.dim_x) * Q
# 计算x的先验估计
# x = Fx + Bu
if B is not None and u is not None:
self.x = dot(F, self.x) + dot(B, u)
else:
self.x = dot(F, self.x)
# 计算P的先验估计
# P = FPF' + Q
# self._alpha_sq 即前面的常量
self.P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q
# 保存先验
# save prior
self.x_prior = self.x.copy()
self.P_prior = self.P.copy()
def update(self, z, R=None, H=None):
"""
添加新的测量值给卡尔曼滤波器
如果z是None,不会进行任何计算。然而,x_post和P_post会copy 先验,并且z会被设置为None
Parameters
----------
z : (dim_z, 1): array_like
测量值用于update函数,z可以是标量如果dim_z是1
否则它需要能够转换为一个列向量。
R : np.array, scalar, or None
可选择输入,测量噪声协方差
H : np.array, or None
可选择输入,测量矩阵
"""
# set to None to force recompute
# 似然值和马氏距离
self._log_likelihood = None
self._likelihood = None
self._mahalanobis = None
# 如果z为空,则相关后验就等于先验
if z is None:
self.z = np.array([[None]*self.dim_z]).T
self.x_post = self.x.copy()
self.P_post = self.P.copy()
self.y = zeros((self.dim_z, 1))
return
# 变换z为(self.dim_z,)的向量
z = reshape_z(z, self.dim_z, self.x.ndim)
if R is None:
R = self.R
elif isscalar(R):
R = eye(self.dim_z) * R
if H is None:
H = self.H
# 计算误差y
# y = z - Hx
# error (residual) between measurement and prediction
self.y = z - dot(H, self.x)
# 计算公共项
# common subexpression for speed
PHT = dot(self.P, H.T)
# 计算系统不确定性
# S = HPH' + R
# project system uncertainty into measurement space
self.S = dot(H, PHT) + R
self.SI = self.inv(self.S)
# 计算卡尔曼滤波增益
# K = PH'inv(S)
# map system uncertainty into kalman gain
self.K = dot(PHT, self.SI)
# 计算x的后验估计(最优估计)
# x = x + Ky
# predict new x with residual scaled by the kalman gain
self.x = self.x + dot(self.K, self.y)
# 计算P的后验估计 P = (I-KH)P(I-KH)' + KRK' 数值计算更稳定
# 且 非最优的K一样适用
# P = (I-KH)P是一般文献中出现的化简的公式
# P = (I-KH)P(I-KH)' + KRK'
# This is more numerically stable
# and works for non-optimal K vs the equation
# P = (I-KH)P usually seen in the literature.
# 计算P的后验估计
I_KH = self._I - dot(self.K, H)
self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(self.K, R), self.K.T)
# 存储测量值和相关后验估计
# save measurement and posterior state
self.z = deepcopy(z)
self.x_post = self.x.copy()
self.P_post = self.P.copy()
def predict_steadystate(self, u=0, B=None):
"""
通过卡尔曼滤波计算x的先验估计,仅更新x,P不再更新。
Parameters
----------
u : np.array
输入向量
B : np.array(dim_x, dim_z), or None
控制转移矩阵
"""
if B is None:
B = self.B
# x = Fx + Bu
if B is not None:
self.x = dot(self.F, self.x) + dot(B, u)
else:
self.x = dot(self.F, self.x)
# save prior
self.x_prior = self.x.copy()
self.P_prior = self.P.copy()
def update_steadystate(self, z):
"""
添加一个新的测量z给卡尔曼滤波器,但是不再重新计算卡尔曼增益K,以及协方差矩阵P和系统不确定S。
你可以采用该函数用于线性是不变系统,因为卡尔曼增益会收敛到一个固定值。
提前计算这些值并清晰地给他们赋值,或者运行采用predict/update函数直到他们收敛
该函数的主要优势是其速度,我们只需要少量的计算,明显降低了矩阵求逆的运算量
需要和predict_steadystate函数联合使用,否则P值会没有边界的增长
Parameters
----------
z : (dim_z, 1): array_like
用于该更新过程的测量值
Examples
--------
>>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
>>> # let filter converge on representative data, then save k and P
>>> for i in range(100):
>>> cv.predict()
>>> cv.update([i, i, i])
>>> saved_k = np.copy(cv.K)
>>> saved_P = np.copy(cv.P)
later on:
>>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
>>> cv.K = np.copy(saved_K)
>>> cv.P = np.copy(saved_P)
>>> for i in range(100):
>>> cv.predict_steadystate()
>>> cv.update_steadystate([i, i, i])
"""
# set to None to force recompute
self._log_likelihood = None
self._likelihood = None
self._mahalanobis = None
if z is None:
self.z = np.array([[None]*self.dim_z]).T
self.x_post = self.x.copy()
self.P_post = self.P.copy()
self.y = zeros((self.dim_z, 1))
return
z = reshape_z(z, self.dim_z, self.x.ndim)
# y = z - Hx
# error (residual) between measurement and prediction
self.y = z - dot(self.H, self.x)
# 不再更新卡尔曼增益 和 P的后验估计
# x = x + Ky
# predict new x with residual scaled by the kalman gain
self.x = self.x + dot(self.K, self.y)
self.z = deepcopy(z)
self.x_post = self.x.copy()
self.P_post = self.P.copy()
# set to None to force recompute
self._log_likelihood = None
self._likelihood = None
self._mahalanobis = None
def get_prediction(self, u=0):
"""
在不更改滤波器参数的情况下,预测下一个阶段的状态
Parameters
----------
u : np.array
optional control input
Returns
-------
(x, P) : tuple
State vector and covariance array of the prediction.
"""
x = dot(self.F, self.x) + dot(self.B, u)
P = self._alpha_sq * dot(dot(self.F, self.P), self.F.T) + self.Q
return (x, P)
def get_update(self, z=None):
"""
不更改滤波器参数的情况下,预测下一个估计值
Parameters
----------
z : (dim_z, 1): array_like
measurement for this update. z can be a scalar if dim_z is 1,
otherwise it must be convertible to a column vector.
Returns
-------
(x, P) : tuple
State vector and covariance array of the update.
"""
if z is None:
return self.x, self.P
z = reshape_z(z, self.dim_z, self.x.ndim)
R = self.R
H = self.H
P = self.P
x = self.x
# error (residual) between measurement and prediction
y = z - dot(H, x)
# common subexpression for speed
PHT = dot(P, H.T)
# project system uncertainty into measurement space
S = dot(H, PHT) + R
# map system uncertainty into kalman gain
K = dot(PHT, self.inv(S))
# predict new x with residual scaled by the kalman gain
x = x + dot(K, y)
# P = (I-KH)P(I-KH)' + KRK'
I_KH = self._I - dot(K, H)
P = dot(dot(I_KH, P), I_KH.T) + dot(dot(K, R), K.T)
return x, P
def residual_of(self, z):
"""
返回z与先验估计值的残差
"""
return z - dot(self.H, self.x_prior)
def measurement_of_state(self, x):
"""
将
Parameters
----------
x : np.array
kalman state vector
Returns
-------
z : (dim_z, 1): array_like
measurement for this update. z can be a scalar if dim_z is 1,
otherwise it must be convertible to a column vector.
"""
return dot(self.H, x)
@property
def log_likelihood(self):
"""
返回上一次测量的对数似然值
"""
if self._log_likelihood is None:
self._log_likelihood = logpdf(x=self.y, cov=self.S)
return self._log_likelihood
@property
def likelihood(self):
"""
返回上一次测量的对数似然值
"""
if self._likelihood is None:
self._likelihood = exp(self.log_likelihood)
if self._likelihood == 0:
self._likelihood = sys.float_info.min
return self._likelihood
@property
def mahalanobis(self):
""""
测量的马氏距离,比如3表示距离预测值之间有3个标准差
Mahalanobis distance of measurement. E.g. 3 means measurement
was 3 standard deviations away from the predicted value.
Returns
-------
mahalanobis : float
"""
if self._mahalanobis is None:
self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y)))
return self._mahalanobis
@property
def alpha(self):
"""
衰减机制系数,1.0表述不衰减,大于1.的值比如1.02可以表示一个衰减效应,使得之前的估计值
对于滤波器的当前的估计值有较小的影响。
"""
return self._alpha_sq**.5
def log_likelihood_of(self, z):
"""
log likelihood of the measurement `z`. This should only be called
after a call to update(). Calling after predict() will yield an
incorrect result."""
if z is None:
return log(sys.float_info.min)
return logpdf(z, dot(self.H, self.x), self.S)
@alpha.setter
def alpha(self, value):
if not np.isscalar(value) or value < 1:
raise ValueError('alpha must be a float greater than 1')
self._alpha_sq = value**2