simple_server.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. from pyopenadr import OpenADRClient, OpenADRServer, enums
  2. from pyopenadr.utils import generate_id, normalize_dict, timedeltaformat, datetimeformat, booleanformat
  3. from pyopenadr.messaging import create_message, parse_message
  4. from datetime import datetime, timezone, timedelta
  5. import asyncio
  6. import sqlite3
  7. import pytest
  8. from aiohttp import web
  9. import json
  10. SERVER_PORT = 8001
  11. VEN_NAME = 'myven'
  12. VTN_ID = "TestVTN"
  13. class EventFormatter(json.JSONEncoder):
  14. def default(self, obj):
  15. if isinstance(obj, timedelta):
  16. return timedeltaformat(obj)
  17. if isinstance(obj, datetime):
  18. return datetimeformat(obj)
  19. if isinstance(obj, boolean):
  20. return booleanformat(obj)
  21. return json.JSONEncoder.default(self, obj)
  22. DB = sqlite3.connect(":memory:")
  23. with DB:
  24. DB.execute("CREATE TABLE vens (ven_id STRING, ven_name STRING, online BOOLEAN, last_seen DATETIME, registration_id STRING)")
  25. DB.execute("CREATE TABLE events (event_id STRING, ven_id STRING, request_id STRING, status STRING, event JSON, created_at DATETIME, updated_at DATETIME)")
  26. def lookup_ven(ven_name):
  27. with DB:
  28. DB.execute("SELECT * FROM vens WHERE ven_name = ?", (ven_name,))
  29. ven = cur.fetchone()
  30. return ven
  31. def add_ven(ven_name, ven_id, registration_id):
  32. with DB:
  33. DB.execute("""INSERT INTO vens (ven_id, ven_name, online, last_seen, registration_id)
  34. VALUES (?, ?, ?, ?, ?)""", (ven_id, ven_name, True, datetime.now().replace(microsecond=0), registration_id))
  35. def add_event(ven_id, event_id, event):
  36. serialized_event = json.dumps(event, cls=EventFormatter)
  37. with DB:
  38. DB.execute("""INSERT INTO events (ven_id, event_id, request_id, status, event)
  39. VALUES (?, ?, ?, ?, ?)""", (ven_id, event_id, None, 'new', serialized_event))
  40. async def _on_poll(ven_id, request_id=None):
  41. cur = DB.cursor()
  42. cur.execute("""SELECT event_id, event FROM events WHERE ven_id = ? AND status = 'new' LIMIT 1""", (ven_id,))
  43. result = cur.fetchone()
  44. if result:
  45. event_id, event = result
  46. event_request_id = generate_id()
  47. with DB:
  48. DB.execute("""UPDATE events SET request_id = ? WHERE event_id = ?""", (event_request_id, event_id))
  49. response_type = 'oadrDistributeEvent'
  50. response_payload = {'response': {'request_id': request_id,
  51. 'response_code': 200,
  52. 'response_description': 'OK'},
  53. 'request_id': event_request_id,
  54. 'vtn_id': VTN_ID,
  55. 'events': [json.loads(event)]}
  56. else:
  57. response_type = 'oadrResponse'
  58. response_payload = {'response': {'request_id': request_id,
  59. 'response_code': 200,
  60. 'response_description': 'OK'},
  61. 'ven_id': ven_id}
  62. return response_type, response_payload
  63. async def _on_create_party_registration(payload):
  64. registration_id = generate_id()
  65. ven_id = generate_id()
  66. add_ven(payload['ven_name'], ven_id, registration_id)
  67. payload = {'response': {'response_code': 200,
  68. 'response_description': 'OK',
  69. 'request_id': payload['request_id']},
  70. 'ven_id': ven_id,
  71. 'registration_id': registration_id,
  72. 'profiles': [{'profile_name': '2.0b',
  73. 'transports': {'transport_name': 'simpleHttp'}}],
  74. 'requested_oadr_poll_freq': timedelta(seconds=10)}
  75. return 'oadrCreatedPartyRegistration', payload
  76. server = OpenADRServer(vtn_id=VTN_ID)
  77. server.add_handler('on_create_party_registration', _on_create_party_registration)
  78. server.add_handler('on_poll', _on_poll)
  79. @pytest.fixture
  80. async def start_server():
  81. runner = web.AppRunner(server.app)
  82. await runner.setup()
  83. site = web.TCPSite(runner, 'localhost', SERVER_PORT)
  84. await site.start()
  85. print("SERVER IS NOW RUNNING")
  86. yield
  87. print("SERVER IS NOW STOPPING")
  88. await runner.cleanup()