# Copyright 2018-2020 Drexel University
# Author: Geoffrey Mainland <mainland@drexel.edu>
"""Support for communicating with gpsd using asyncio"""
import asyncio
import json
import json.decoder
import logging
import time
import dateutil.parser
logger = logging.getLogger('gpsd')
DEFAULT_GPSD_SERVER = '127.0.0.1'
DEFAULT_GPSD_PORT = 6000
[docs]class GPSDClient:
"""A client for communicating with gpsd"""
def __init__(self, loc,
loop=None,
server_host=DEFAULT_GPSD_SERVER,
server_port=DEFAULT_GPSD_PORT):
self.loc = loc
self.loop = loop
self.server_host = server_host
self.server_port = server_port
self.reader = None
self.writer = None
self.gpsd_task = loop.create_task(self.run())
[docs] async def stop(self):
"""Stop gpsd reader"""
self.gpsd_task.cancel()
await self.gpsd_task
if self.writer:
self.writer.close()
self.writer = None
[docs] async def connect(self):
"""Connect to gpsd"""
self.reader, self.writer = await asyncio.open_connection(self.server_host,
self.server_port,
loop=self.loop)
[docs] async def watch(self, enable):
"""Set whether or not to watch GPS info"""
if enable:
e = 'true'
else:
e = 'false'
command = '?WATCH={{"enable":{0},"json":true}}'.format(e)
self.writer.write(bytes(command, encoding='utf-8'))
[docs] async def run(self):
"""Run gpsd listener"""
wait_to_connect = False
while True:
try:
if wait_to_connect:
await asyncio.sleep(1)
wait_to_connect = False
await self.connect()
await self.watch(True)
while True:
raw = await self.reader.readline()
data = json.loads(raw.decode(encoding='utf-8'))
if data['class'] == 'TPV':
t = time.time()
if 'time' in data:
dt = dateutil.parser.parse(data['time'])
t = time.mktime(dt.timetuple())
else:
t = time.time()
for attr in ['lat', 'lon', 'alt']:
if attr in data:
setattr(self.loc, attr, data[attr])
self.loc.timestamp = t
except ConnectionError:
wait_to_connect = True
except json.decoder.JSONDecodeError:
wait_to_connect = True
except asyncio.CancelledError:
return
except: # pylint: disable=bare-except
logger.exception('Could not obtain GPS location')
break