# =============================================================================
# >> IMPORTS
# =============================================================================
# Python
import asyncio
from base64 import b64encode
from hashlib import sha256
import json
from traceback import format_exc
# websockets
import websockets
# obs-ws-rc
from .events import events
from .logs import logger
from .requests import (
AuthenticateRequest, GetAuthRequiredRequest, ResponseStatus)
# =============================================================================
# >> CONSTANTS
# =============================================================================
URI_TEMPLATE = "ws://{host}:{port}/"
DEFAULT_PORT = 4444
# =============================================================================
# >> CLASSES
# =============================================================================
[docs]class AuthError(Exception):
"""Raised by :meth:`OBSWS.connect` if authentication has failed."""
pass
[docs]class OBSWS:
"""Main class used for obs-websocket communication. Can be used as a
context manager (given you use it in ``async with`` statement).
Example usage::
async with OBSWS("localhost") as obsws:
...
This is an equivalent to the following::
obsws = OBSWS("localhost")
await obsws.connect()
try:
...
finally:
await obsws.close()
This class also supports Future-like protocol (it implements
:meth:`__await__` method). You can await for the OBSWS instance for it
to close::
await obsws
.. note::
When entering the context manager (using ``async with`` statement),
you should be ready to except :exc:`AuthError` that might raise due to
failed auth, or :exc:`OSError` that can be raised by the underlying
websockets library in case of being unable to connect to OBS Studio.
.. seealso::
:meth:`connect`
:meth:`close`
"""
def __init__(self, host, port=DEFAULT_PORT, password=None, *,
skip_auth=False, loop=None):
"""
:param str host: Server host
:param int port: Server port
:param str|None password: Server password (if needed)
:param bool skip_auth: Whether or not to skip authentication
:param asyncio.AbstractEventLoop|None loop: Event loop to use
"""
self._host = host
self._port = port
self._password = password
self._skip_auth = skip_auth
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._message_map = {}
self._message_count = 0
self._event_handlers = {}
self._ws = None
self._ws_close_event = asyncio.Event(loop=self._loop)
self._done_event = asyncio.Event(loop=self._loop)
self._done_event.set()
self._event_futures = {}
def __await__(self):
async def coro():
return await self._done_event.wait()
return coro().__await__()
@property
def host(self):
"""The host that OBSWS was instantiated with (read-only).
:return: Server host
:rtype: str
"""
return self._host
@property
def port(self):
"""The port that OBSWS was instantiated with (read-only).
:return: Server port
:rtype: int
"""
return self._port
@property
def password(self):
"""The port that OBSWS was instantiated with (read-only).
:return: Server password (``None`` if not given)
:rtype: str|None
"""
return self._password
@property
def closed(self):
"""Return whether or not this OBSWS instance is closed."""
return self._done_event.is_set()
[docs] async def connect(self):
"""Establish connection to the server, start the event loop and
perform authentication (the latter can be skipped with ``skip_auth``
argument in :meth:`__init__`).
:raises ValueError: if already connected
:raises AuthError: if auth is enabled but password is invalid or not
not set
:raises OSError: raised by the underlying websockets library if
connection attempt is failed
.. note::
This method is a coroutine.
"""
if self._ws is not None:
raise ValueError("Already connected")
self._ws = await websockets.connect(URI_TEMPLATE.format(
host=self._host,
port=self._port
))
self._ws_close_event.clear()
asyncio.ensure_future(self._recv_loop())
if not self._skip_auth:
try:
await self._authenticate()
except AuthError:
await self._close()
await self._done_event.wait()
raise
async def _close(self):
"""Close the underlying websocket connection.
.. note::
This method is a coroutine.
"""
if self._ws is None:
return
try:
await self._ws.close()
except ConnectionResetError:
pass
self._ws = None
self._ws_close_event.set()
for future in self._message_map.values():
future.set_result(None)
for future in list(self._event_futures.values()):
future.set_result(None)
self._message_map.clear()
[docs] async def close(self):
"""Clean shutdown. Consequent calls on an already closed instance have
not effect.
.. note::
This method is a coroutine.
"""
await self._close()
await self._done_event.wait()
async def __aenter__(self):
"""Enter context: connect and return self.
:return: Ready-to-work OBSWS instance
:rtype: OBSWS
.. note::
This method is a coroutine.
"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Leave context: clean shutdown.
.. note::
This method is a coroutine.
"""
await self._close()
await self._done_event.wait()
async def _recv_loop(self):
self._done_event.clear()
while not self._ws_close_event.is_set():
try:
data = json.loads(await self._ws.recv())
except websockets.ConnectionClosed:
await self._close()
else:
message_id = data.get('message-id')
if message_id is not None:
self._message_map.pop(message_id).set_result(data)
continue
type_name = data.get('update-type')
if type_name is not None:
asyncio.ensure_future(
self._handle_event(type_name, data), loop=self._loop)
continue
# TODO: Not a response nor an event - log an error maybe?
self._done_event.set()
async def _authenticate(self):
response = await self.require(GetAuthRequiredRequest())
if not response.auth_required:
return
if self._password is None:
raise AuthError(
"Auth is required, but the password is not provided")
auth_string = b64encode(sha256(b64encode(
sha256((self._password + response.salt).encode('utf-8')).digest()
) + response.challenge.encode('utf-8')).digest()).decode('ascii')
response = await self.require(AuthenticateRequest(auth=auth_string))
if response.status == ResponseStatus.ERROR:
raise AuthError(
"Auth has failed, error from the server: '{error}'".format(
error=response.error))
[docs] async def require(self, request):
"""Send a request to the server and await, return the response.
:param requests.BaseRequest request: Fully formed request
:return: Response from the server (None if the connection was
closed during communication)
:rtype: requests.BaseResponse|None
:raises ValueError: if not connected
.. note::
This method is a coroutine.
"""
if self._ws is None:
raise ValueError("Not connected")
self._message_count += 1
message_id = str(self._message_count)
future = self._message_map[message_id] = self._loop.create_future()
data = request.get_request_data(message_id)
await self._ws.send(json.dumps(data))
message = await future
if message is None:
return None
return request.response_class(message)
[docs] def event(self, type_name=None):
"""Return a future that, when awaited for, returns an event of type
``type_name``. If ``type_name`` is None, the future result will be the
first occurred event. If connection is closed while future is not done,
the future result is None.
:param str|None type_name: Event type to await for, ``None`` to await
for an event of any type
:return: Future
:rtype: asyncio.Future
:raises ValueError: if not connected
.. versionchanged:: 2.3.0
This method is not a coroutine now, but it returns a
:class:`asyncio.Future` object.
"""
if self._ws is None:
raise ValueError("Not connected")
if type_name not in self._event_futures:
self._event_futures[type_name] = self._loop.create_future()
def callback(f):
del self._event_futures[type_name]
self._event_futures[type_name].add_done_callback(callback)
return self._event_futures[type_name]
async def _handle_event(self, type_name, data):
event_class = events.get(type_name)
# Do we know of this event type?
if event_class is None:
# Not implemented in our protocol, no further action required
return
callbacks = self._event_handlers.get(type_name)
future_any = self._event_futures.get(None)
future_event = self._event_futures.get(type_name)
# Is there anybody willing to handle it?
if callbacks is None and future_any is None and future_event is None:
# No, so we don't even instantiate the event class
return
try:
event = event_class(data)
except:
logger.error(
"OBS-WS-RC: '{type_name}' event instantiation raised (invalid "
"protocol.json?)\n{exc}\nThe callbacks are not going to be "
"called!".format(type_name=type_name, exc=format_exc()))
return
if future_any is not None:
future_any.set_result(event)
if future_event is not None:
future_event.set_result(event)
if callbacks is not None:
coros = []
for callback in callbacks:
if asyncio.iscoroutinefunction(callback):
coros.append(callback(self, event))
else:
try:
callback(self, event)
except:
logger.error(
"OBS-WS-RC: '{type_name}' event "
"handler raised!\n{exc}\n".format(
type_name=type_name, exc=format_exc()))
close_future = asyncio.ensure_future(self._ws_close_event.wait(),
loop=self._loop)
gather_future = asyncio.gather(
*coros, return_exceptions=True, loop=self._loop)
done, pending = await asyncio.wait(
[close_future, gather_future],
return_when=asyncio.FIRST_COMPLETED,
loop=self._loop
)
if gather_future.done():
results = await gather_future
for result in results:
if isinstance(result, BaseException):
try:
raise result
except:
logger.error(
"OBS-WS-RC: '{type_name}' event "
"async handler raised!\n{exc}\n".format(
type_name=type_name, exc=format_exc()))
else:
gather_future.cancel()
if not close_future.done():
close_future.cancel()
[docs] def register_event_handler(self, type_name, callback):
"""Register event handler (either a regular one or an async-coroutine).
:param type_name: Event name
:param callable callback: Function or coroutine function
:raises ValueError: if callback is already registered for the event
.. deprecated:: 2.2
Use :meth:`event` instead.
"""
if type_name not in self._event_handlers:
self._event_handlers[type_name] = []
if callback in self._event_handlers[type_name]:
raise ValueError(
"Callback '{callback}' is already registered to handle "
"'{type_name}' event".format(
callback=callback, type_name=type_name))
self._event_handlers[type_name].append(callback)
[docs] def unregister_event_handler(self, type_name, callback):
"""Unregister previously registered event handler.
:param type_name: Event name
:param callable callback: Function or coroutine function
.. deprecated:: 2.2
Use :meth:`event` instead.
"""
self._event_handlers[type_name].remove(callback)
if not self._event_handlers[type_name]:
del self._event_handlers[type_name]