testframework.datasinks.influx_client module

This module provides functionality to interact with influxDB Database

It includes functionality to create InfluxDB client with function to write and query data

class testframework.datasinks.influx_client.InfluxDBClient(token: str, org: str, url: str, bucket: str)

Bases: object

A class used to represent Influxdb client token = settings.INFLUXDB_TOKEN org = settings.INFLUXDB_ORG url = settings.INFLUXDB_URL bucket = settings.INFLUXDB_BUCKET

token

token to access influxdb

org

influx database org

url

address of influx database

bucket

folder of influxdb

write(PowerMeasurementCollection)

writes power measurement collection to influx database

query(query, org)

query data from org decribed in query

query(query, org)

get data described in query from org

query_data_frame(query, org)

Get data described in query from org as Pandas data frame

write_contactor(contactor: ContactorMetaData, state: ContactorState)

Writes the Contactor state to InfluxDB.

Parameters:
Returns:

None

write_dcsource(dcsource_var: DcSourceVariable)

Writes DcSource measurement to InfluxDB.

Parameters:

dcsource_var (DcSourceVariable) – The DcSourceVariable object containing the data to be written.

Returns:

None

write_ops(sensor_name: Enum, response: NumberDataPoint)

writes ops global sensor value to InfluxDB

write_ops_setting_annotation(setting_name: str | None, setting_value: float | None)

Writes annotation to InfluxDB for when changing a setting name/value. For example setting lpp or lpc with values. :param setting_name: The name of the setting. :type setting_name: str :param setting_value: The value of the setting. :type setting_value: float

Returns:

None

write_power(data: MeasurementCollection)

Writes Power Measurement Collection to InfluxDB.

Parameters:

data (MeasurementCollection) – The power measurement collection to be written.

Raises:

ValueError – If the measurement type is unknown.

write_setcontactor_annotation(contactor_point: str, contactor_state: str)

Writes annotation to InfluxDB for when changing a contactor setting name/value.

write_sw_version(sw_version: str)

write sw version to influxDB

write_temerature(temperature: float, temperature_tag: TemperatureTag)

Writes the Contactor state to InfluxDB.

Parameters:
Returns:

None

write_testbench_metadata(metadata: TestBench)

Write testbench metadata inventory, testEquipment