Hydraulic systems are commonly used in heavy equipment. In a hydraulic system, pressure, applied to a contained fluid is transmitted. That pressurized fluid acts upon every part of the section of a containing vessel and creates force. Due to the use of this force it is possible for instance to lift heavy loads. A hydraulic circuit is a system that controls where fluid flows as well as fluid pressure, it is responsible for transporting liquid through a set of interconnected components. It consists of multiple components and offers range of areas to monitor to ensure optimal operation. In our use case we will focus on valve.
Hydraulic system of our interest consists of a primary working and a secondary cooling-filtration circuit, both connected via the oil tank. The system operates in 60 seconds cycles. From operations perspective it is desired to understand if system is in sub-optimal condition so pro-active action can be taken. Condition monitoring improves efficiency and maximizes uptime of a machine.
To understand at what condition is concerned component operating (e.g. via custom metric), evaluation of measurements from relevant sensors is necessary. In this Use case we will solve following problem: Quantify condition of valve based on actual sensory measurements. This is not typical forecasting case as we do not predict values for the future timestamps, instead we build models to quantify KPI of interest based on input variables at given moment.
Business objective: | Reduce outages in production process |
Business value: | Reduce cost of down-times and inefficient operations |
KPI: | - |
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import logging
import json
import datetime
import tim_client
Credentials and logging
(Do not forget to fill in your credentials in the credentials.json file)
with open('credentials.json') as f:
credentials_json = json.load(f) # loading the credentials from credentials.json
TIM_URL = 'https://timws.tangent.works/v4/api' # URL to which the requests are sent
SAVE_JSON = False # if True - JSON requests and responses are saved to JSON_SAVING_FOLDER
JSON_SAVING_FOLDER = 'logs/' # folder where the requests and responses are stored
LOGGING_LEVEL = 'INFO'
level = logging.getLevelName(LOGGING_LEVEL)
logging.basicConfig(level=level, format='[%(levelname)s] %(asctime)s - %(name)s:%(funcName)s:%(lineno)s - %(message)s')
logger = logging.getLogger(__name__)
credentials = tim_client.Credentials(credentials_json['license_key'], credentials_json['email'], credentials_json['password'], tim_url=TIM_URL)
api_client = tim_client.ApiClient(credentials)
api_client.save_json = SAVE_JSON
api_client.json_saving_folder_path = JSON_SAVING_FOLDER
This dataset was gathered by measuring hydraulic system run on a test rig. The system cyclically repeats constant load cycles (duration of 60 sec.) and measures values such as: pressure, volume flow and temperature. Meanwhile the condition of hydraulic components - cooler, valve, pump, and accumulator - is quantified.
One of conditions monitored - valve - is selected as target variable.
Raw data files were transformed into time series with 1-minute sampling rate. Raw data were sampled at variable frequency, e.g. pressure 100x per second, volume flow at 10 Hz, so aggregation was necessary to synchronize predictors in dataset.
Structure of CSV file:
Column name | Description | Type | Availability |
---|---|---|---|
time | Date | Timestamp column | |
valve | Condition of valve metric | Target | t-1 |
cycle | Operations cycle no. | Predictor | t+0 |
TS1 ... TS4 | Temperature (Celsius) | Predictor | t+0 |
PS1 ... PS6 | Pressure (bars) | Predictor | t+0 |
VS1 | Vibration (mm/s) | Predictor | t+0 |
FS1 ... FS2 | Volume flow (l/min) | Predictor | t+0 |
CP | Cooling power (virtual, kW) | Predictor | t+0 |
CE | Cooling efficiency (virtual, %) | Predictor | t+0 |
SE | Efficiency factor (%) | Predictor | t+0 |
EPS1 | Motor power (W) | Predictor | t+0 |
Meaning of valve condition values are explained below:
If we want TIM to quantify current condition based on measurement, it means that we want to predict value of target based on predictors values, and so the last record of target must be kept empty (NaN/None) in dataset. TIM will use available predictors to predict given record. This situation will be replicated by TIM to calculate results for all out-of-sample records.
CSV files used in experiments can be downloaded here.
Raw data files were acquired at Kaggle.
data = tim_client.load_dataset_from_csv_file('data_valve.csv', sep=',')
data
target_column = 'valve'
timestamp_column = 'time'
fig = go.Figure()
fig.add_trace( go.Scatter( x = data['time'], y=data['valve'], name='Valve') )
fig.show()
Parameters that need to be set are:
We also ask for additional data from engine to see details of sub-models so we define extendedOutputConfiguration parameter as well.
prediction_horizon = 1
backtest_length = int( data.shape[0] * .25 )
configuration_engine = {
'usage': {
'predictionTo': {
'baseUnit': 'Sample',
'offset': prediction_horizon
},
'backtestLength': backtest_length,
},
'features': [ 'Polynomial','Intercept','Identity' ],
'allowOffsets': False,
'extendedOutputConfiguration': {
'returnExtendedImportances': True
}
}
backtest = api_client.prediction_build_model_predict( data, configuration_engine )
backtest.status
backtest.result_explanations
simple_importances = backtest.predictors_importances['simpleImportances']
simple_importances = sorted(simple_importances, key = lambda i: i['importance'], reverse=True)
simple_importances = pd.DataFrame.from_dict( simple_importances )
fig = go.Figure()
fig.add_trace(go.Bar( x = simple_importances['predictorName'], y = simple_importances['importance'] ) )
fig.update_layout( title='Simple importances' )
fig.show()
extended_importances = backtest.predictors_importances['extendedImportances']
extended_importances = sorted(extended_importances, key = lambda i: i['importance'], reverse=True)
extended_importances = pd.DataFrame.from_dict( extended_importances )
fig = go.Figure()
fig.add_trace( go.Bar( x = extended_importances[ extended_importances['time'] == '[1]' ]['termName'],
y = extended_importances[ extended_importances['time'] == '[1]' ]['importance'] ) )
fig.update_layout( title='Features generated from predictors used by model')
fig.show()
backtest.aggregated_predictions[0]['accuracyMetrics'] # in-sample
backtest.aggregated_predictions[1]['accuracyMetrics'] # out-of-sample
def create_eval_df( predictions ):
data2 = data.copy()
data2[ 'time' ] = pd.to_datetime( data2[ 'time' ]).dt.tz_localize('UTC')
data2.rename( columns={'time':'Timestamp'}, inplace=True)
data2.set_index( 'Timestamp', inplace=True)
eval_data = data2[ [ target_column ] ].join( predictions, how='inner' )
return eval_data
edf_in_sample = create_eval_df( backtest.aggregated_predictions[0]['values'] )
fig = go.Figure()
fig.add_trace( go.Scatter( x = edf_in_sample.index, y=edf_in_sample['Prediction'], name='In-Sample') )
fig.add_trace( go.Scatter( x = edf_in_sample.index, y=edf_in_sample[ target_column ], name='Actual') )
fig.show()
edf_out_of_sample = create_eval_df( backtest.aggregated_predictions[1]['values'] )
fig = go.Figure()
fig.add_trace( go.Scatter( x = edf_out_of_sample.index, y=edf_out_of_sample['Prediction'], name='Out-of-Sample') )
fig.add_trace( go.Scatter( x = edf_out_of_sample.index, y=edf_out_of_sample[ target_column ], name='Actual') )
fig.show()
We demonstrated how TIM can support pro-active condition monitoring. Information provided by prediction can be used by operator of machine, if it drops below treshold value, appropriate action will be taken.
Domain knowledge is key factor in development of solutions for use cases like this one; it is important for proper target encoding as well as setting the right warning tresholds.