simple_server.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from openleadr import OpenADRClient, OpenADRServer, enums
  13. from openleadr.utils import generate_id, normalize_dict, timedeltaformat, datetimeformat, booleanformat
  14. from openleadr.messaging import create_message, parse_message
  15. from datetime import datetime, timezone, timedelta
  16. import asyncio
  17. import sqlite3
  18. import pytest
  19. from aiohttp import web
  20. import json
  21. SERVER_PORT = 8001
  22. VEN_NAME = 'myven'
  23. VTN_ID = "TestVTN"
  24. class EventFormatter(json.JSONEncoder):
  25. def default(self, obj):
  26. if isinstance(obj, timedelta):
  27. return timedeltaformat(obj)
  28. if isinstance(obj, datetime):
  29. return datetimeformat(obj)
  30. if isinstance(obj, bool):
  31. return booleanformat(obj)
  32. return json.JSONEncoder.default(self, obj)
  33. DB = sqlite3.connect(":memory:")
  34. with DB:
  35. DB.execute("CREATE TABLE vens (ven_id STRING, ven_name STRING, online BOOLEAN, last_seen DATETIME, registration_id STRING)")
  36. DB.execute("CREATE TABLE events (event_id STRING, ven_id STRING, request_id STRING, status STRING, event JSON, created_at DATETIME, updated_at DATETIME)")
  37. def lookup_ven(ven_name):
  38. with DB:
  39. DB.execute("SELECT * FROM vens WHERE ven_name = ?", (ven_name,))
  40. ven = DB.fetchone()
  41. return ven
  42. def add_ven(ven_name, ven_id, registration_id):
  43. with DB:
  44. DB.execute("""INSERT INTO vens (ven_id, ven_name, online, last_seen, registration_id)
  45. VALUES (?, ?, ?, ?, ?)""", (ven_id, ven_name, True, datetime.now().replace(microsecond=0), registration_id))
  46. def add_event(ven_id, event_id, event):
  47. serialized_event = json.dumps(event, cls=EventFormatter)
  48. with DB:
  49. DB.execute("""INSERT INTO events (ven_id, event_id, request_id, status, event)
  50. VALUES (?, ?, ?, ?, ?)""", (ven_id, event_id, None, 'new', serialized_event))
  51. async def _on_poll(ven_id, request_id=None):
  52. cur = DB.cursor()
  53. cur.execute("""SELECT event_id, event FROM events WHERE ven_id = ? AND status = 'new' LIMIT 1""", (ven_id,))
  54. result = cur.fetchone()
  55. if result:
  56. event_id, event = result
  57. event_request_id = generate_id()
  58. with DB:
  59. DB.execute("""UPDATE events SET request_id = ? WHERE event_id = ?""", (event_request_id, event_id))
  60. response_type = 'oadrDistributeEvent'
  61. response_payload = {'response': {'request_id': request_id,
  62. 'response_code': 200,
  63. 'response_description': 'OK'},
  64. 'request_id': event_request_id,
  65. 'vtn_id': VTN_ID,
  66. 'events': [json.loads(event)]}
  67. else:
  68. response_type = 'oadrResponse'
  69. response_payload = {'response': {'request_id': request_id,
  70. 'response_code': 200,
  71. 'response_description': 'OK'},
  72. 'ven_id': ven_id}
  73. return response_type, response_payload
  74. async def _on_create_party_registration(payload):
  75. registration_id = generate_id()
  76. ven_id = generate_id()
  77. add_ven(payload['ven_name'], ven_id, registration_id)
  78. payload = {'response': {'response_code': 200,
  79. 'response_description': 'OK',
  80. 'request_id': payload['request_id']},
  81. 'ven_id': ven_id,
  82. 'registration_id': registration_id,
  83. 'profiles': [{'profile_name': '2.0b',
  84. 'transports': {'transport_name': 'simpleHttp'}}],
  85. 'requested_oadr_poll_freq': timedelta(seconds=10)}
  86. return 'oadrCreatedPartyRegistration', payload
  87. server = OpenADRServer(vtn_id=VTN_ID)
  88. server.add_handler('on_create_party_registration', _on_create_party_registration)
  89. server.add_handler('on_poll', _on_poll)
  90. @pytest.fixture
  91. async def start_server():
  92. runner = web.AppRunner(server.app)
  93. await runner.setup()
  94. site = web.TCPSite(runner, 'localhost', SERVER_PORT)
  95. await site.start()
  96. print("SERVER IS NOW RUNNING")
  97. yield
  98. print("SERVER IS NOW STOPPING")
  99. await runner.cleanup()