Initial commit.
- qa-tools public release which includes:
- trace-based coverage tool
- quality metrics measurement and tracking setup
- associated in-source documentation.
Signed-off-by: Basil Eljuse <basil.eljuse@arm.com>
diff --git a/quality-metrics/broker-component/db_manager.py b/quality-metrics/broker-component/db_manager.py
new file mode 100644
index 0000000..8f2d77c
--- /dev/null
+++ b/quality-metrics/broker-component/db_manager.py
@@ -0,0 +1,151 @@
+#!/usr/bin/env python3
+
+from __future__ import print_function
+
+__copyright__ = """
+/*
+ * Copyright (c) 2020, Arm Limited. All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *
+ */
+ """
+
+""" db_manager.py:
+
+ Database interface abstraction class. This class is aimed at providing an
+ asynchronous interface between a blocking IO resource(database) and a
+ public interface designed for high concurrency.
+
+"""
+
+import time
+import threading
+from queue import Queue
+from pprint import pformat
+from influxdb import InfluxDBClient
+
+import constants
+from data_converter import DataConverter
+
+
+class dbManager(object):
+ def __init__(self,
+ host=constants.HOST,
+ port=constants.PORT,
+ user=None,
+ password=None,
+ buff_size=constants.BUFF_SIZE,
+ poll_delay=constants.POLL_DELAY,
+ app=None):
+ self.queue_buff_sz = buff_size
+ self.poll_delay = poll_delay
+
+ self.db_host = host
+ self.db_port = port
+ self.db_user = user
+ self.db_pass = password
+ self.write_queue = Queue(maxsize=self.queue_buff_sz)
+ self.stop_threads = False
+ self.app = app
+
+ for key in constants.DATABASE_DICT:
+ client = InfluxDBClient(host=self.db_host,
+ port=self.db_port,
+ username=self.db_user,
+ password=self.db_pass,
+ database=constants.DATABASE_DICT[key])
+ setattr(self, key.lower() + '_client', client)
+
+ def store(self, data):
+ """
+ Places data in the FIFO to be broadcast when
+ the database is not busy
+
+ :param: data: Data to be placed in FIFO
+ """
+ validation = 'OK'
+ err_code = 204
+ try:
+ self.write_queue.put(data)
+ except Exception as e:
+ validation = "** Write to Queue Failed. ** "
+ err_code = 402
+ print(validation, e)
+ self.app.logger.error(pformat({"error_code": err_code,
+ "info": validation, "exception": e}))
+ return validation, err_code
+
+ def get_db_client(self, metrics):
+ if "stats" in metrics:
+ client = metrics.replace("_stats", "") + "_client"
+ elif "tracking" in metrics:
+ client = metrics.replace("_tracking", "") + "_client"
+ else:
+ client = metrics + "_client"
+ if hasattr(self, client):
+ return getattr(self, client)
+ else:
+ self.app.logger.error("Invalid metrics %" % (metrics))
+
+ def write_db_direct(self, data):
+ """
+ Write data to database ( will block if database is busy )
+
+ :param: data: data to be written to database
+ """
+ db_client = self.get_db_client(data['metadata']['metrics'])
+
+ converted_data = DataConverter.convert_data(data)
+
+ if db_client:
+ if db_client.write_points(converted_data):
+ self.app.logger.info(
+ "Writing to InfluxDB hosted at %s "
+ "has been successful for %s!" %
+ (self.db_host, data['metadata']['metrics']))
+ else:
+ self.app.logger.error(
+ "Writing to InfluxDB hosted at %s "
+ "has FAILED for %s!" %
+ (self.db_host, data['metadata']['metrics']))
+ else:
+ self.app.logger.error(
+ "%s database not connected.." %
+ data['metadata']['metrics'])
+
+ def start_daemon(self):
+ """
+ Spawn a new thread that will consume data in the write FIFO
+ and place it into the databse
+ """
+
+ def write_db_loop():
+
+ while True:
+ try:
+ time.sleep(self.poll_delay)
+ if self.stop_threads:
+ self.app.logger.info(
+ "\n ** Shutting Down Database Writer **")
+ return
+ elif self.write_queue.qsize() > 0:
+ dt = self.write_queue.get()
+ # Write the data to the databse
+ self.write_db_direct(dt)
+ self.write_queue.task_done()
+ except Exception as e:
+ self.app.logger.error(
+ "** DB Writer Thread Failed. ** \n%s" % e)
+
+ self.db_write_thread = threading.Thread(target=write_db_loop)
+ self.db_write_thread.daemon = True
+ self.db_write_thread.start()
+ return self
+
+ def stop(self):
+ """
+ Flag which terminates db_write_threads loop
+ """
+ self.app.logger.info("** Setting stop_threads to True **")
+ self.stop_threads = True