firmware+controller: Initial working HID state control

This commit is contained in:
2024-12-13 17:46:59 +00:00
parent 6d0db71305
commit 6d2d07d595
5 changed files with 133 additions and 24 deletions

View File

@@ -1,17 +1,18 @@
#!/usr/bin/env python3
import hid
USB_VID = 0x6969
USB_PID = 0x0004
import valconomy
def main():
hid_handle = hid.device()
hid_handle.open(vendor_id=USB_VID, product_id=USB_PID)
h = valconomy.HIDValconomyHandler()
try:
hid_handle.write(b'\x00test')
h.menu(None, False)
h.none()
h.menu(None, True)
h.idle(None)
h.service()
finally:
hid_handle.close()
h.close()
if __name__ == '__main__':
main()

View File

@@ -1,10 +1,13 @@
import base64
from dataclasses import dataclass
from enum import Enum
import errno
import json
import logging
from pprint import pprint
import os
import queue
import struct
import time
from typing import Callable
@@ -49,12 +52,12 @@ class RiotPlayerInfo:
return f'{self.name}#{self.tag}'
class ValorantLocalClient:
lockfile_path = os.path.join(os.environ['LOCALAPPDATA'], r'Riot Games\Riot Client\Config\lockfile')
poll_interval = 0.5
def __init__(self, callback: Callable[[RiotPlayerInfo, bool], None]):
self.callback = callback
self.lockfile_path = os.path.join(os.environ['LOCALAPPDATA'], r'Riot Games\Riot Client\Config\lockfile')
self.port, self.password = None, None
self.running = True
self.players = {}
@@ -228,7 +231,76 @@ class ValconomyHandler:
log.info('Hard luck...')
class HIDValconomyHandler(ValconomyHandler):
pass
vid = 0x6969
pid = 0x0004
def __init__(self):
self.dev = None
self.running = True
self.queue = queue.Queue(128)
def close(self):
if self.dev is not None:
self.dev.close()
def _dev_ready(self):
try:
if self.dev is None:
self.dev = hid.device()
self.dev.open(vendor_id=self.vid, product_id=self.pid)
log.info(f'USB device opened')
# 2 bytes: report ID and returned value
# We get back the same report ID and value
data = self.dev.get_input_report(0, 2)
assert len(data) == 2
return data[1] == 1
except OSError:
self.dev = None
return False
def _do(self, cmd, *vals, fmt=None):
if fmt is None:
fmt = ''
fmt = '<BB' + fmt
# Prepend report ID 0
data = struct.pack(fmt, *(0, cmd) + vals)
self.dev.write(data)
def _enq(self, cmd, *vals, fmt=None):
self.queue.put((cmd, vals, fmt))
def service(self):
while not self.queue.empty():
cmd, vals, fmt = self.queue.get()
try:
while not self._dev_ready():
if not self.running:
break
time.sleep(0.1)
self._do(cmd, *vals, fmt=fmt)
except OSError as ex:
log.warn(f'USB device lost, state dequeuing stalled')
continue
def run(self):
while self.running:
while self.queue.empty():
time.sleep(0.5)
self.service()
def none(self):
self._enq(0)
def menu(self, info: RiotPlayerInfo, was_idle: bool=False):
self._enq(1, 1 if was_idle else 0, fmt='B')
def idle(self, info: RiotPlayerInfo):
self._enq(2)
class GameState(Enum):
NONE = 0