OSR to XToys

unlisted ⁨1⁩ ⁨file⁩ 2023-11-10 02:08:50 UTC

osr_to_xtoys

Raw
import serial
import time
import asyncio
import websockets
import json
import threading

# Com Port Settings
ser = serial.Serial()
ser.port = 'COM3'
ser.baudrate = 115200
ser.open()

# Declare global variables
speed = None
upper = None
lower = None
mode = None
header = '\r\n'


def speed_background():
    mess = None
    while True:
        upperloc = upper
        lowerloc = lower
        speedloc = speed
        msspeed = int((speedloc * 1000) / 2)

        if speed != 0:
            timedelay = (((upperloc - lowerloc) * 0.1) / speedloc) / 6
        else:
            timedelay = 1

        if timedelay > 0.16 and timedelay < 0.25:
            timedelay = 0.25
        elif timedelay < 0.16:
            timedelay = 0.15

        if upperloc == 100:
            upperloc = 999

        # Up
        if len(str(upperloc)) == 2 and speedloc > 0:
            mess = "L0" + str(upperloc) + "0S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())
        elif len(str(upperloc)) == 3 and speedloc > 0:
            mess = "L0" + str(upperloc) + "S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())

        time.sleep(timedelay)

        # Down
        if len(str(lowerloc)) == 1 and speedloc > 0:
            mess = "L000" + str(lowerloc) + "S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())
        elif len(str(lowerloc)) == 2 and speedloc > 0:
            mess = "L0" + str(lowerloc) + "0S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())

        time.sleep(timedelay)

        while mode == "position":
            time.sleep(1)


async def write(websocket):
    async for message in websocket:
        parsed = json.loads(message)
        global mode, speed, upper, lower
        mode = parsed["mode"]
        threadrunning = b.is_alive()
        if mode == "position":
            pos = parsed["position"]
            pos = str(pos)
            if len(pos) == 1:
                mess = str("L00" + pos + "0" + header)
                ser.write(mess.encode())
            elif len(pos) == 2:
                mess = "L0" + pos + "0" + header
                ser.write(mess.encode())
            elif len(pos) == 3:
                mess = "L0999" + header
                ser.write(mess.encode())
                print(mess)
        elif mode == "speed":

            speed = int(parsed["speed"])
            upper = parsed["upper"]
            lower = parsed["lower"]

            speed = speed / 10

            if threadrunning == False:
                b.start()


async def main():
    async with websockets.serve(write, "localhost", 8088):
        await asyncio.Future()  # run forever


b = threading.Thread(name='speed_background', target=speed_background)

asyncio.run(main())