blob: 8f2d77c2c4dca8806b0697b4fe3f31f18b5272d2 [file] [log] [blame]
Basil Eljuse4b14afb2020-09-30 13:07:23 +01001#!/usr/bin/env python3
2
3from __future__ import print_function
4
5__copyright__ = """
6/*
7 * Copyright (c) 2020, Arm Limited. All rights reserved.
8 *
9 * SPDX-License-Identifier: BSD-3-Clause
10 *
11 */
12 """
13
14""" db_manager.py:
15
16 Database interface abstraction class. This class is aimed at providing an
17 asynchronous interface between a blocking IO resource(database) and a
18 public interface designed for high concurrency.
19
20"""
21
22import time
23import threading
24from queue import Queue
25from pprint import pformat
26from influxdb import InfluxDBClient
27
28import constants
29from data_converter import DataConverter
30
31
32class dbManager(object):
33 def __init__(self,
34 host=constants.HOST,
35 port=constants.PORT,
36 user=None,
37 password=None,
38 buff_size=constants.BUFF_SIZE,
39 poll_delay=constants.POLL_DELAY,
40 app=None):
41 self.queue_buff_sz = buff_size
42 self.poll_delay = poll_delay
43
44 self.db_host = host
45 self.db_port = port
46 self.db_user = user
47 self.db_pass = password
48 self.write_queue = Queue(maxsize=self.queue_buff_sz)
49 self.stop_threads = False
50 self.app = app
51
52 for key in constants.DATABASE_DICT:
53 client = InfluxDBClient(host=self.db_host,
54 port=self.db_port,
55 username=self.db_user,
56 password=self.db_pass,
57 database=constants.DATABASE_DICT[key])
58 setattr(self, key.lower() + '_client', client)
59
60 def store(self, data):
61 """
62 Places data in the FIFO to be broadcast when
63 the database is not busy
64
65 :param: data: Data to be placed in FIFO
66 """
67 validation = 'OK'
68 err_code = 204
69 try:
70 self.write_queue.put(data)
71 except Exception as e:
72 validation = "** Write to Queue Failed. ** "
73 err_code = 402
74 print(validation, e)
75 self.app.logger.error(pformat({"error_code": err_code,
76 "info": validation, "exception": e}))
77 return validation, err_code
78
79 def get_db_client(self, metrics):
80 if "stats" in metrics:
81 client = metrics.replace("_stats", "") + "_client"
82 elif "tracking" in metrics:
83 client = metrics.replace("_tracking", "") + "_client"
84 else:
85 client = metrics + "_client"
86 if hasattr(self, client):
87 return getattr(self, client)
88 else:
89 self.app.logger.error("Invalid metrics %" % (metrics))
90
91 def write_db_direct(self, data):
92 """
93 Write data to database ( will block if database is busy )
94
95 :param: data: data to be written to database
96 """
97 db_client = self.get_db_client(data['metadata']['metrics'])
98
99 converted_data = DataConverter.convert_data(data)
100
101 if db_client:
102 if db_client.write_points(converted_data):
103 self.app.logger.info(
104 "Writing to InfluxDB hosted at %s "
105 "has been successful for %s!" %
106 (self.db_host, data['metadata']['metrics']))
107 else:
108 self.app.logger.error(
109 "Writing to InfluxDB hosted at %s "
110 "has FAILED for %s!" %
111 (self.db_host, data['metadata']['metrics']))
112 else:
113 self.app.logger.error(
114 "%s database not connected.." %
115 data['metadata']['metrics'])
116
117 def start_daemon(self):
118 """
119 Spawn a new thread that will consume data in the write FIFO
120 and place it into the databse
121 """
122
123 def write_db_loop():
124
125 while True:
126 try:
127 time.sleep(self.poll_delay)
128 if self.stop_threads:
129 self.app.logger.info(
130 "\n ** Shutting Down Database Writer **")
131 return
132 elif self.write_queue.qsize() > 0:
133 dt = self.write_queue.get()
134 # Write the data to the databse
135 self.write_db_direct(dt)
136 self.write_queue.task_done()
137 except Exception as e:
138 self.app.logger.error(
139 "** DB Writer Thread Failed. ** \n%s" % e)
140
141 self.db_write_thread = threading.Thread(target=write_db_loop)
142 self.db_write_thread.daemon = True
143 self.db_write_thread.start()
144 return self
145
146 def stop(self):
147 """
148 Flag which terminates db_write_threads loop
149 """
150 self.app.logger.info("** Setting stop_threads to True **")
151 self.stop_threads = True