Testy protokołu WebSocket

Coraz częściej, użytkownicy aplikacji oczekują od niej działania w czasie rzeczywistym – dotyczy to nie tylko rozmaitych komunikatorów (to przykład, który najłatwiej sobie wyobrazić), ale wszystkich aplikacji, które wyświetlają dane, mogące się często zmieniać. Użytkownik nie chce odświeżać strony/widoku tylko po to, żeby sprawdzić czy interesujące go dane się zmieniły, lub otrzymał nową wiadomość na czacie.

Można spróbować wprowadzić rozwiązania oparte na protokole HTTP, np. wysyłanie w krótkim interwale czasowym zapytania po nowe wiadomości. Łatwo sobie jednak wyobrazić, że takie podejście generuje ogromny ruch i obciąża serwer, który co chwile musi odpowiadać „Nie ma żadnej nowej wiadomości.”

Rozwiązaniem tego zagadnienia może być protokół WebSocket, zapewniający dwukierunkowy kanał komunikacji za pośrednictwem jednego gniazda TCP.

WebSocket

Komunikacja w protokole HTTP wygląda tak:

Klient wysyła żądanie, wykorzystując jedną z metod (GET, POST, DELETE etc.). Jako informacja zwrotna, zostaje zwrócona odpowiedź (zawartość zasobu o który prosiliśmy, lub informacja o jego zapisie/usunięciu etc.). Jak widać, serwer nie ma możliwości poinformowania klienta, że w raz udostępnionym zasobie zaszły jakieś zmiany – klient musi się o to zapytać.

Inaczej sytuacja wygląda w przypadku protokołu WebSocket: na samym początku klient wysyła do serwera zapytanie, w którym informuje go o przejściu na protokół WebSocket (wykorzystując nagłówek „Upgrade”), jeżeli serwer obsługuje ten protokół – zostaje nawiązane dwukierunkowe połączenie – klient będzie mógł otrzymywać informacje bez konieczności wysyłania kolejnych zapytań.

Testy

Zadanie wydawało się proste: przetestowanie REST API, w którym część komunikacji odbywa się przez protokół WebSocket. Założenie działania jest bardzo proste: do API wpiętych może być wielu użytkowników, jeżeli jeden z nich zmieni informacje o zasobie: reszta powinna zostać o tym poinformowana (wyświetlane informacje powinny się zmienić), mniej więcej w ten sposób:

Od początku wiadomo było, że testy WebSocket chcemy wpiąć razem z resztą testów API, muszą więc być od siebie zupełnie niezależne, każdy WebSocket musi być otwierany dla użytkownika (lub użytkowników) testowego (testowych) generowanego w danym teście, dla zasobów tworzonych w danej metodzie testowej. Wynika to z tego, że każdy test w naszych projektach jest uruchamiany równolegle (to znakomicie przyspiesza czas ich wykonywania). Kod testów powinien zatem zawierać metody, które będą otwierać połączenia WebSocket na konkretne zasoby.

Kolejnym aspektem było zachowanie możliwie prostego zapisu samych skryptów testowych.

Przykład testu

Uproszczony scenariusz testu wygląda może wyglądać następująco:

      1. SetUp
        1. Tworzymy zasób
        2. Tworzymy dwóch użytkowników testowych
      2. Test:
        1. Tworzymy WebSocket dla użytkownika 2
        2. Użytkownik 1 zmienia dane zasobu
      3. Asercja:
        1. Użytkownik 2 otrzymał informacje o zmianie zasobu

Kod

Do pracy z WebSocket wykorzystałem pythonową bibliotekę websocket-client, która pozwala na szybkie zestawienie połączenia. Chciałem ją jednak opakować wewnątrz klasy – głównie żeby prawidłowo obsłużyć otrzymane wiadomości (bo to one głównie nas interesowały)

Problemy, które się pojawiły w trakcie:

  • Przekazywanie metody z wnętrza klasy do inicjalizacji WebSocketApp (tak, aby callbacki z WebSocket mogły się do nich odwołać): z nimi radzi sobie opakowanie metod w lambdy.
  • Uruchamianie WebSocketa tak aby dalszy kod testów mógł się wykonywać: użycie wątków.
  • Obsłużenie problemu powyżej poprzez uruchamianie oddzielnego wątku zrodziło następny problem: test przechodził dalej, zanim zakończone zostało ustanawianie połączenia: z tym problemem radzi sobie metoda „wait_for_connection”.
  • Zamykanie połączenia bez konieczności pamiętania o tym w kodzie samego testu: dlatego korzystamy z rozwiązania context managera, w metodzie „__enter__” uruchamiany jest wątek z WebSocketem, a w „__exit__” WebSocket jest zamykany, a wątek kończony.
  • Odczekiwanie na pojawienie się interesującej nas wiadomości: to obsługuje ta pętla wewnątrz właściwości „message”

W efekcie klasa ogsługąjąca WebSocket wyglądała tak:

import _thread
from json import loads, dumps
from threading import Thread
import websocket
from time import sleep
from configs.env_configs import WSS_URL


class BackgroundSubscription:
    def __init__(self, headers, subscription):
        self.headers = headers
        self.subscrption = subscription
        self._msg = None
        self.ws = None
        self.started = False

    def __enter__(self):
        self.ws_thread = Thread(target=self.conn, args=())
        self.ws_thread.start()
        self.wait_for_connection()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.ws.close()
        self.ws_thread.join()

    # we have to be sure, that connection is established
    def wait_for_connection(self):
        cnt = 0
        while not self.started and cnt < 10:
            sleep(0.2)
            cnt += 1
        if not self.started:
            self.ws.close()
            self.ws_thread.join()
            raise Exception("WebSocket connection isn't established")


    def conn(self):
        self.ws = websocket.WebSocketApp(WSS_URL,
                                         on_message=lambda ws, msg: self.on_message(ws, msg),
                                         on_error=lambda ws, msg: self.on_error(ws, msg),
                                         on_close=lambda ws: self.on_close(ws),
                                         )
        self.ws.on_open = lambda ws: self.on_open()
        self.ws.run_forever()

    @property
    def message(self):
        cnt = 0
        while self._msg is None and cnt < 10:
            sleep(0.5)
            cnt += 1
        return self._msg

    def on_error(self, ws, error):
        print(error)

    def on_close(self, ws):
        pass

    def on_open(self):
        def run():
            # content of these messages depends on your API
            message = dumps({"type": "init", "payload": {"Authorization": self.headers["Authorization"]}})
            self.ws.send(message)
            message = dumps({"id": '1', "type": 'subscription_start', "query": self.subscription})
            self.ws.send(message)
        _thread.start_new_thread(run, ())
        self.started = True

    def on_message(self, ws, msg):
        # we want only certain data from response
        if "subscription_data" in msg:
            self._msg = loads(msg)["payload"]

Klasa BackgroundSubscription opakowywana jest w klasach pośrednich (steps), które np. tworzą subskrypcje na konkretne zasoby. Do tak opakowanej klasy można wygodnie odwołać się w samych testach:

def test_data_changed_response(self):
    self.DATA.NAME = "SUBSCRIPTION TEST"
    with self.STEPS.data_subscription(event_id) as subscription:
        self.STEPS.update_data(data_id, self.DATA)
        response = subscription.message
    assert_json_schema(response, "data_schema")

Część z kodu tego testu może nie być oczywista, wynika to z faktu, że korzystam w nich z modeli danych (self.DATA) i warstwy abstrakcji (self.STEPS), oraz asercji dla schematów JSON. O tym postaram się jeszcze napisać.

Tym co nas najbardziej interesuje to fragment „with….”: to właśnie pythonowy context manager który pozwala nam na nieprzejmowanie się zamknięciem WebSocket – robi to za nas, jeżeli pamiętaliśmy o zaimplementowaniu tego w metodzie „__exit__’.

Podsumowanie

Z punktu widzenia testowania REST API – WebSocket nie różni się od zwykłych testów, w dalszym ciągu możemy testować poprawność otrzymanych danych. Różnice mają bardziej techniczny charakter – konieczne jest zaimplementowanie obsługi protokołu, oraz pamiętanie o jego zamknięciu.

Komentarze