osr_to_xtoys
@@ -0,0 +1,109 @@
+import serial
+import time
+import asyncio
+import websockets
+import json
+import threading
+
+# Com Port Settings
+ser = serial.Serial()
+ser.port = 'COM3'
+ser.baudrate = 115200
+ser.open()
+
+# Declare global variables
+speed = None
+upper = None
+lower = None
+mode = None
+header = '\r\n'
+
+
+def speed_background():
+ mess = None
+ while True:
+ upperloc = upper
+ lowerloc = lower
+ speedloc = speed
+ msspeed = int((speedloc * 1000) / 2)
+
+ if speed != 0:
+ timedelay = (((upperloc - lowerloc) * 0.1) / speedloc) / 6
+ else:
+ timedelay = 1
+
+ if timedelay > 0.16 and timedelay < 0.25:
+ timedelay = 0.25
+ elif timedelay < 0.16:
+ timedelay = 0.15
+
+ if upperloc == 100:
+ upperloc = 999
+
+ # Up
+ if len(str(upperloc)) == 2 and speedloc > 0:
+ mess = "L0" + str(upperloc) + "0S" + str(msspeed) + header
+ print("message: " + mess)
+ ser.write(mess.encode())
+ elif len(str(upperloc)) == 3 and speedloc > 0:
+ mess = "L0" + str(upperloc) + "S" + str(msspeed) + header
+ print("message: " + mess)
+ ser.write(mess.encode())
+
+ time.sleep(timedelay)
+
+ # Down
+ if len(str(lowerloc)) == 1 and speedloc > 0:
+ mess = "L000" + str(lowerloc) + "S" + str(msspeed) + header
+ print("message: " + mess)
+ ser.write(mess.encode())
+ elif len(str(lowerloc)) == 2 and speedloc > 0:
+ mess = "L0" + str(lowerloc) + "0S" + str(msspeed) + header
+ print("message: " + mess)
+ ser.write(mess.encode())
+
+ time.sleep(timedelay)
+
+ while mode == "position":
+ time.sleep(1)
+
+
+async def write(websocket):
+ async for message in websocket:
+ parsed = json.loads(message)
+ global mode, speed, upper, lower
+ mode = parsed["mode"]
+ threadrunning = b.is_alive()
+ if mode == "position":
+ pos = parsed["position"]
+ pos = str(pos)
+ if len(pos) == 1:
+ mess = str("L00" + pos + "0" + header)
+ ser.write(mess.encode())
+ elif len(pos) == 2:
+ mess = "L0" + pos + "0" + header
+ ser.write(mess.encode())
+ elif len(pos) == 3:
+ mess = "L0999" + header
+ ser.write(mess.encode())
+ print(mess)
+ elif mode == "speed":
+
+ speed = int(parsed["speed"])
+ upper = parsed["upper"]
+ lower = parsed["lower"]
+
+ speed = speed / 10
+
+ if threadrunning == False:
+ b.start()
+
+
+async def main():
+ async with websockets.serve(write, "localhost", 8088):
+ await asyncio.Future() # run forever
+
+
+b = threading.Thread(name='speed_background', target=speed_background)
+
+asyncio.run(main())