#!/usr/bin/env python
# encoding: utf-8
# Copyright 2010 Xinyu, He <legendmohe@foxmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
import subprocess
import json
from Queue import Queue, Empty
import collections
import logging
import zmq
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
DEBUG = logging.debug
INFO = logging.info
WARN = logging.warning
ERROR = logging.error
CRITICAL = logging.critical
class tag_endpoint(object):
# tag 的蓝牙地址
tag_addrs = ["E2C56DB5-DFFB-48D2-B060-D0F5A71096E0"]
def __init__(self, name):
self.server_ip = "tcp://*:8006"
self.name = name
self.tags = {}
self._queues = {}
self._init_fliter()
def _init_fliter(self):
self.fliter = {}
N = 5.0
A = 1.0
for tag_addr in tag_endpoint.tag_addrs:
self.fliter[tag_addr] = {}
self.fliter[tag_addr]['N'] = N
self.fliter[tag_addr]['A'] = A
self.fliter[tag_addr]['S'] = \
self.fliter[tag_addr]['N']*self.fliter[tag_addr]['A']
self.fliter[tag_addr]['queue'] = collections.deque([1]*10, maxlen=10)
# 网上找的一个计算距离的公式,不太准
def calDistance(self, txPower, rssi):
if rssi == 0:
return 0.0
ratio = rssi*1.0/txPower
if ratio < 1.0:
return ratio**10
else:
accuracy = 0.89976*(ratio**7.7095) + 0.111
return accuracy
# 窗口长度为10,去除最高和最低值后,减去上一次的平均值再加上窗口的平均值
def rssi_fliter(self, addr, rssi):
if addr not in self.fliter:
return -1
fliter = self.fliter[addr]
if fliter is None:
return -1
fliter['queue'].append(rssi)
ordered_queue = list(fliter['queue'])
ordered_queue.sort()
ordered_queue = ordered_queue[1:-1]
C = sum(ordered_queue)/len(ordered_queue)
fliter['S'] = fliter['S'] - fliter['A'] + C
fliter['A'] = fliter['S']/fliter['N']
return fliter['A']
# 依赖于ibeacon_scan这个bash脚本
def _fetch_rssi(self):
subprocess.call(
["sudo", "killall", "-9", "hcitool"],
)
subprocess.call(
["./vender/ibeacon_scan"],
)
proc = subprocess.Popen(
["./vender/ibeacon_scan", "-b"],
stdout=subprocess.PIPE)
while True :
try:
data = proc.stdout.readline() #block / wait
# print data
if data == "":
print "no broadcast data."
break
datas = data.split()
addr = datas[0]
if addr in self._queues:
# 将收集的数据放进缓冲队列里
self._queues[addr].put(datas)
except Exception, ex:
ERROR(ex)
time.sleep(3)
def _parse_rssi(self, addr):
queue = self._queues[addr]
while True:
try:
# 从缓冲队列里取出数据
datas = queue.get(timeout=10)
queue.task_done()
txPower = int(datas[3])
rssi = int(datas[4])
rssi = self.rssi_fliter(addr, rssi)
self.tags[addr] = self.calDistance(txPower, rssi)
except Empty:
self.tags[addr] = -1.0
INFO('parse rssi timeout.')
except Exception, ex:
self.tags[addr] = -1.0
ERROR(ex)
time.sleep(3)
INFO("addr:%s, distance:%f" % (addr, self.tags[addr]))
def start(self):
INFO("start tag server...")
INFO("bind to" + self.server_ip)
fetch_t = threading.Thread(
target=self._fetch_rssi
)
fetch_t.daemon = True
fetch_t.start()
# 当其中一个ibeacon广播超时的时候,要在程序里标记这个状态。若只用一个
# 线程来统一处理接受到的广播包,那么对某一个ibeacon的超时检测的逻辑就
# 会比较复杂。所以分开几条线程来处理,每条线程负责管理某个ibeacon的状态
for addr in tag_endpoint.tag_addrs:
self._queues[addr] = Queue() # queue for each tag
parse_t = threading.Thread(
target=self._parse_rssi,
args=(addr, )
)
parse_t.daemon = True
parse_t.start()
context = zmq.Context()
self.socket = context.socket(zmq.REP)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.bind(self.server_ip)
time.sleep(0.5)
while True:
try:
poller = zmq.Poller()
poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
if poller.poll(5*1000):
req = self.socket.recv_string()
# rep = "%s#%s" % (self.name, str(self.distance))
rep = {"res": "error"}
if req in tag_endpoint.tag_addrs:
distance = self.tags[req]
status = "unknown" if distance < 0 else "normal"
rep = {"res":
{
"name": self.name,
"distance": distance,
"status": status,
}
}
self.socket.send_string(json.dumps(rep))
INFO("recv req:" + req)
except (KeyboardInterrupt, SystemExit):
self.socket.close()
self.stop()
raise
except Exception, ex:
ERROR(ex)
time.sleep(3)
def stop(self):
subprocess.call(
["sudo", "killall", "-9", "hcitool"],
)
print "tag_endpoint stop."
if __name__ == '__main__':
tag_endpoint("test").start()
其中ibeacon_scan这个bash脚本内容如下:
#!/bin/bash
# iBeacon Scan by Radius Networks
if [[ $1 == "parse" ]]; then
packet=""
capturing=""
count=0
while read line
do
count=$[count + 1]
if [ "$capturing" ]; then
if [[ $line =~ ^[0-9a-fA-F]{2}\ [0-9a-fA-F] ]]; then
packet="$packet $line"
else
if [[ $packet =~ ^04\ 3E\ 2A\ 02\ 01\ .{26}\ 02\ 01\ .{14}\ 02\ 15 ]]; then
UUID=`echo $packet | sed 's/^.\{69\}\(.\{47\}\).*$/\1/'`
MAJOR=`echo $packet | sed 's/^.\{117\}\(.\{5\}\).*$/\1/'`
MINOR=`echo $packet | sed 's/^.\{123\}\(.\{5\}\).*$/\1/'`
POWER=`echo $packet | sed 's/^.\{129\}\(.\{2\}\).*$/\1/'`
UUID=`echo $UUID | sed -e 's/\ //g' -e 's/^\(.\{8\}\)\(.\{4\}\)\(.\{4\}\)\(.\{4\}\)\(.\{12\}\)$/\1-\2-\3-\4-\5/'`
MAJOR=`echo $MAJOR | sed 's/\ //g'`
MAJOR=`echo "ibase=16; $MAJOR" | bc`
MINOR=`echo $MINOR | sed 's/\ //g'`
MINOR=`echo "ibase=16; $MINOR" | bc`
POWER=`echo "ibase=16; $POWER" | bc`
POWER=$[POWER - 256]
RSSI=`echo $packet | sed 's/^.\{132\}\(.\{2\}\).*$/\1/'`
RSSI=`echo "ibase=16; $RSSI" | bc`
RSSI=$[RSSI - 256]
if [[ $2 == "-b" ]]; then
echo "$UUID $MAJOR $MINOR $POWER $RSSI"
else
echo "UUID: $UUID MAJOR: $MAJOR MINOR: $MINOR POWER: $POWER RSSI: $RSSI"
fi
fi
capturing=""
packet=""
fi
fi
if [ ! "$capturing" ]; then
if [[ $line =~ ^\> ]]; then
packet=`echo $line | sed 's/^>.\(.*$\)/\1/'`
capturing=1
fi
fi
done
else
sudo hcitool -i hci0 lescan --duplicates 1>/dev/null &
if [ "$(pidof hcitool)" ]; then
sudo hcidump --raw | ./$0 parse $1
fi
fi