-
Notifications
You must be signed in to change notification settings - Fork 2
Example DAQ scripts
All of these requests depend on forming a valid DRF request. Please refer to the DRF specification for detailed capabilities.
Live data is streamed from the control system and never ends. Once you begin iterating over the data at async for evt_res in dpm
, it will not stop until you explicitly stop by leaving the context with a break
or exit
.
import sys
import acsys.dpm
async def my_app(con):
# Setup context
async with acsys.dpm.DPMContext(con) as dpm:
# Add acquisition requests
await dpm.add_entry(0, 'Z:CUBE_X.SETTING')
await dpm.add_entry(1, 'Z:CUBE_Y.SETTING')
# Start acquisition
await dpm.start()
# Process incoming data
async for evt_res in dpm:
if evt_res.is_reading_for(0):
# This 0 argument matches the tag in `add_entry`
if evt_res.is_reading_for(0):
print(evt_res)
else:
# This is responses to tags that aren't 0
pass
else:
# This is likely a status response
pass
acsys.run_client(my_app)
Refer to the Data Sources page for various logger request strategies. Note that the NODE
component is optional and included for backward compatibility. It's not recommended to use the NODE
argument. Instead, specify the DRF to match the logging rate in the data logger.
Data logger data is returned in chunks of 487 data points with a final empty chunk to indicate that the response is complete. You must include an exit condition or the script will simply continue to run without response.
import acsys.dpm
results = {
'data': [],
'stamps': []
}
async def my_app(con):
# Setup context
async with acsys.dpm.DPMContext(con) as dpm:
# Add acquisition requests
# This is a request for 60 seconds of data
await dpm.add_entry(0, 'M:OUTTMP@P,1000,true<-LOGGER:1620754270000:1620754330000')
# Start acquisition
await dpm.start()
# Process incoming data
async for evt_res in dpm:
if evt_res.is_reading_for(0):
# Exit condition: empty data means we're done
if evt_res.data == []:
break
else:
results['data'] = results['data'] + evt_res.data
results['stamps'] = results['stamps'] + evt_res.micros
else:
print(f'Status response: {evt_res}')
acsys.run_client(my_app)
print(results)
Here is an example of a response from a data logger request. tag
matches the first value passed to dpm.add_entry
. stamp
is related to the time of the request, not the data. data
is a list of data points index aligned with the list in micros
which are the ms timestamps from the data logger. meta
contains a collection of information about the requested control system parameter. di
is the unique device index related to the parameter. name
is the string name of the parameter. desc
is a string description of the parameter. units
is a string for the units of the value.
{
tag: 0,
stamp: 2021-05-11 17:44:24.888000+00:00,
data: [
53.35943970488751,
53.429207203857516,
53.429207203857516,
53.429207203857516,
53.429207203857516,
53.429207203857516,
53.429207203857516,
53.429207203857516,
53.498974702827525,
53.498974702827525,
53.429207203857516,
53.498974702827525,
53.498974702827525,
53.429207203857516,
53.498974702827525,
53.498974702827525,
53.498974702827525,
53.498974702827525,
53.498974702827525,
53.568742201797505,
53.568742201797505,
53.498974702827525,
53.568742201797505,
53.568742201797505,
53.568742201797505,
53.638509700767514,
53.568742201797505,
53.568742201797505,
53.568742201797505,
53.568742201797505,
53.638509700767514,
53.638509700767514,
53.568742201797505,
53.568742201797505,
53.568742201797505,
53.568742201797505,
53.568742201797505,
53.498974702827525,
53.568742201797505,
53.568742201797505,
53.568742201797505,
53.638509700767514,
53.568742201797505,
53.638509700767514,
53.498974702827525,
53.568742201797505,
53.638509700767514,
53.568742201797505,
53.638509700767514,
53.638509700767514,
53.568742201797505,
53.638509700767514,
53.638509700767514,
53.708277199737495,
53.638509700767514,
53.708277199737495,
53.708277199737495,
53.708277199737495,
53.708277199737495,
53.708277199737495
],
meta:
{
'di': 27235,
'name': 'M:OUTTMP',
'desc': 'Outdoor temperature (F)',
'units': 'DegF',
'format_hint': None
},
micros: [
1620754270342000,
1620754271342000,
1620754272341000,
1620754273341000,
1620754274341000,
1620754275341000,
1620754276340000,
1620754277340000,
1620754278340000,
1620754279339000,
1620754280339000,
1620754281339000,
1620754282338000,
1620754283338000,
1620754284338000,
1620754285337000,
1620754286337000,
1620754287337000,
1620754288336000,
1620754289336000,
1620754290336000,
1620754291335000,
1620754292335000,
1620754293335000,
1620754294334000,
1620754295334000,
1620754296333000,
1620754297333000,
1620754298333000,
1620754299332000,
1620754300332000,
1620754301331000,
1620754302331000,
1620754303331000,
1620754304330000,
1620754305330000,
1620754306330000,
1620754307329000,
1620754308329000,
1620754309329000,
1620754310328000,
1620754311328000,
1620754312328000,
1620754313327000,
1620754314327000,
1620754315327000,
1620754316326000,
1620754317326000,
1620754318326000,
1620754319325000,
1620754320325000,
1620754321324000,
1620754322324000,
1620754323324000,
1620754324323000,
1620754325323000,
1620754326323000,
1620754327322000,
1620754328322000,
1620754329322000
]
}
FTP responses have the same structure as data logger responses. The FTP data source can be used to request a rate faster than 15Hz.
import acsys.dpm
async def my_app(con):
# Setup context
async with acsys.dpm.DPMContext(con) as dpm:
# Add acquisition requests
# This is a request for 60 seconds of data
await dpm.add_entry(0, 'M:OUTTMP@P,20H,true<-FTP')
# Start acquisition
await dpm.start()
# Process incoming data
async for evt_res in dpm:
if evt_res.is_reading_for(0):
print(evt_res)
acsys.run_client(my_app)
Settings require a valid Kerberos ticket and that your Fermilab username is registered to set a predetermined set of devices. The set of devices is determined by the role
provided to dpm.enable_settings
.
This script gets data every 1 second and sets the device after reading the current setting.
import acsys.dpm
async def my_app(con):
# Setup context
async with acsys.dpm.DPMContext(con) as dpm:
# Check kerberos credentials and enable settings
await dpm.enable_settings(role='testing')
# Add acquisition requests
await dpm.add_entry(0, 'Z:CUBE_X.SETTING@P,1H,true')
# Start acquisition
await dpm.start()
# Process incoming data
async for evt_res in dpm:
if evt_res.is_reading_for(0):
# Show current setting value
print(evt_res)
# Increment the current setting by 1
await dpm.apply_settings([(0, evt_res.data + 1)])
# NOTE: These settings are driven by request rate, in this case 1Hz.
acsys.run_client(my_app)
This script makes a single setting and exits. Note that the DRF requests specify the N
event. This represents the "never" event and it tells DPM to not set up this entry for acquisition (i.e. it's only to be used for settings.)
import acsys.dpm
async def my_app(con):
# Setup context
async with acsys.dpm.DPMContext(con) as dpm:
# Check kerberos credentials and enable settings
await dpm.enable_settings(role='testing')
# Add acquisition requests
await dpm.add_entry(0, 'Z:CUBE_X.SETTING@N')
# Set an arbitrary value
await dpm.apply_settings([(0, 42)])
acsys.run_client(my_app)
This script makes a single setting to three devices and exits.
import acsys.dpm
async def my_app(con):
# Setup context
async with acsys.dpm.DPMContext(con) as dpm:
# Check kerberos credentials and enable settings
await dpm.enable_settings(role='testing')
# Add acquisition requests
await dpm.add_entry(0, 'Z:CUBE_X.SETTING@N')
await dpm.add_entry(1, 'Z:CUBE_Y.SETTING@N')
await dpm.add_entry(2, 'Z:CUBE_Z.SETTING@N')
# Set an arbitrary value
await dpm.apply_settings([(0, 42),
(1, 42),
(2, 42)])
acsys.run_client(my_app)