firmware+controller: Initial working HID state control

This commit is contained in:
2024-12-13 17:46:59 +00:00
parent 6d0db71305
commit 277eb4ee3b
5 changed files with 138 additions and 23 deletions

View File

@@ -1,17 +1,22 @@
#!/usr/bin/env python3
import hid
import logging
USB_VID = 0x6969
USB_PID = 0x0004
import valconomy
def main():
hid_handle = hid.device()
hid_handle.open(vendor_id=USB_VID, product_id=USB_PID)
logging.basicConfig(
format='%(asctime)s %(name)s %(levelname)s %(message)s', level=logging.INFO)
h = valconomy.HIDValconomyHandler()
try:
hid_handle.write(b'\x00test')
h.menu(None, False)
h.none()
h.menu(None, True)
h.idle(None)
h.service()
finally:
hid_handle.close()
h.close()
if __name__ == '__main__':
main()

View File

@@ -1,10 +1,13 @@
import base64
from dataclasses import dataclass
from enum import Enum
import errno
import json
import logging
from pprint import pprint
import os
import queue
import struct
import time
from typing import Callable
@@ -49,12 +52,12 @@ class RiotPlayerInfo:
return f'{self.name}#{self.tag}'
class ValorantLocalClient:
lockfile_path = os.path.join(os.environ['LOCALAPPDATA'], r'Riot Games\Riot Client\Config\lockfile')
poll_interval = 0.5
def __init__(self, callback: Callable[[RiotPlayerInfo, bool], None]):
self.callback = callback
self.lockfile_path = os.path.join(os.environ['LOCALAPPDATA'], r'Riot Games\Riot Client\Config\lockfile')
self.port, self.password = None, None
self.running = True
self.players = {}
@@ -228,7 +231,78 @@ class ValconomyHandler:
log.info('Hard luck...')
class HIDValconomyHandler(ValconomyHandler):
pass
vid = 0x6969
pid = 0x0004
def __init__(self):
self.dev = None
self.running = True
self.queue = queue.Queue(128)
def close(self):
if self.dev is not None:
self.dev.close()
def _dev_ready(self):
try:
if self.dev is None:
dev = hid.device()
dev.open(vendor_id=self.vid, product_id=self.pid)
self.dev = dev
log.info(f'USB device opened')
# 2 bytes: report ID and returned value
# We get back the same report ID and value
data = self.dev.get_input_report(0, 2)
assert len(data) == 2
return data[1] == 1
except OSError:
if self.dev is not None:
log.warn(f'USB device lost')
self.dev = None
return False
def _do(self, cmd, *vals, fmt=None):
if fmt is None:
fmt = ''
fmt = '<BB' + fmt
# Prepend report ID 0
data = struct.pack(fmt, *(0, cmd) + vals)
self.dev.write(data)
def _enq(self, cmd, *vals, fmt=None):
self.queue.put((cmd, vals, fmt))
def service(self):
while not self.queue.empty():
cmd, vals, fmt = self.queue.get()
while not self._dev_ready():
if not self.running:
break
time.sleep(0.1)
try:
self._do(cmd, *vals, fmt=fmt)
except OSError as ex:
log.warn(f'USB device lost, state dequeuing stalled')
def run(self):
while self.running:
while self.queue.empty():
time.sleep(0.5)
self.service()
def none(self):
self._enq(0)
def menu(self, info: RiotPlayerInfo, was_idle: bool=False):
self._enq(1, 1 if was_idle else 0, fmt='B')
def idle(self, info: RiotPlayerInfo):
self._enq(2)
class GameState(Enum):
NONE = 0