osr_to_xtoys
import serial
import time
import asyncio
import websockets
import json
import threading
# Com Port Settings
ser = serial.Serial()
ser.port = 'COM3'
ser.baudrate = 115200
ser.open()
# Declare global variables
speed = None
upper = None
lower = None
mode = None
header = '\r\n'
def speed_background():
mess = None
while True:
upperloc = upper
lowerloc = lower
speedloc = speed
msspeed = int((speedloc * 1000) / 2)
if speed != 0:
timedelay = (((upperloc - lowerloc) * 0.1) / speedloc) / 6
else:
timedelay = 1
if timedelay > 0.16 and timedelay < 0.25:
timedelay = 0.25
elif timedelay < 0.16:
timedelay = 0.15
if upperloc == 100:
upperloc = 999
# Up
if len(str(upperloc)) == 2 and speedloc > 0:
mess = "L0" + str(upperloc) + "0S" + str(msspeed) + header
print("message: " + mess)
ser.write(mess.encode())
elif len(str(upperloc)) == 3 and speedloc > 0:
mess = "L0" + str(upperloc) + "S" + str(msspeed) + header
print("message: " + mess)
ser.write(mess.encode())
time.sleep(timedelay)
# Down
if len(str(lowerloc)) == 1 and speedloc > 0:
mess = "L000" + str(lowerloc) + "S" + str(msspeed) + header
print("message: " + mess)
ser.write(mess.encode())
elif len(str(lowerloc)) == 2 and speedloc > 0:
mess = "L0" + str(lowerloc) + "0S" + str(msspeed) + header
print("message: " + mess)
ser.write(mess.encode())
time.sleep(timedelay)
while mode == "position":
time.sleep(1)
async def write(websocket):
async for message in websocket:
parsed = json.loads(message)
global mode, speed, upper, lower
mode = parsed["mode"]
threadrunning = b.is_alive()
if mode == "position":
pos = parsed["position"]
pos = str(pos)
if len(pos) == 1:
mess = str("L00" + pos + "0" + header)
ser.write(mess.encode())
elif len(pos) == 2:
mess = "L0" + pos + "0" + header
ser.write(mess.encode())
elif len(pos) == 3:
mess = "L0999" + header
ser.write(mess.encode())
print(mess)
elif mode == "speed":
speed = int(parsed["speed"])
upper = parsed["upper"]
lower = parsed["lower"]
speed = speed / 10
if threadrunning == False:
b.start()
async def main():
async with websockets.serve(write, "localhost", 8088):
await asyncio.Future() # run forever
b = threading.Thread(name='speed_background', target=speed_background)
asyncio.run(main())