297 lines
10 KiB
Python
297 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
from datetime import timedelta, datetime
|
|
from os import error
|
|
from time import sleep
|
|
from requests import get, post
|
|
import re
|
|
|
|
######### META DATA #################
|
|
# For data collection organizational purposes
|
|
USER_ID = '' # e.g. Discord handle
|
|
PRINTER_MODEL = '' # e.g. 'voron_v2_350'
|
|
MEASURE_TYPE = '' # e.g. 'nozzle_pin', 'microswitch_probe', etc.
|
|
NOTES = '' # anything note-worthy about this particular run, no "=" characters
|
|
#####################################
|
|
|
|
######### CONFIGURATION #############
|
|
BASE_URL = 'http://127.0.0.1' # printer URL (e.g. http://192.168.1.15)
|
|
# leave default if running locally
|
|
BED_TEMPERATURE = 105 # bed temperature for measurements
|
|
HE_TEMPERATURE = 100 # extruder temperature for measurements
|
|
MEASURE_INTERVAL = 1. # measurement interval (minutes)
|
|
N_SAMPLES = 3 # number of repeated measures
|
|
HOT_DURATION = 3 # time after bed temp reached to continue
|
|
# measuring, in hours
|
|
COOL_DURATION = 0 # hours to continue measuring after heaters
|
|
# are disabled
|
|
MEASURE_GCODE = 'G28 Z' # G-code called on repeated measurements, single line/macro only
|
|
# chamber thermistor config name. Change to match your own, or "" if none
|
|
# will also work with temperature_fan configs
|
|
CHAMBER_CONFIG = "temperature_sensor chamber"
|
|
#####################################
|
|
|
|
|
|
MCU_Z_POS_RE = re.compile(r'(?P<mcu_z>(?<=stepper_z:)-*[0-9.]+)')
|
|
DATA_FILENAME = "expansion_quant_%s_%s.csv" % (USER_ID,
|
|
datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
|
|
last_measurement = datetime.now() - timedelta(minutes=MEASURE_INTERVAL)
|
|
start_time = datetime.now() + timedelta(days=1)
|
|
index = 0
|
|
BASE_URL = BASE_URL.strip('/') # remove any errant "/" from the address
|
|
|
|
|
|
def gather_metadata():
|
|
resp = get(BASE_URL + '/printer/objects/query?configfile').json()
|
|
config = resp['result']['status']['configfile']['settings']
|
|
|
|
# Gather Z axis information
|
|
config_z = config['stepper_z']
|
|
if 'rotation_distance' in config_z.keys():
|
|
rot_dist = config_z['rotation_distance']
|
|
steps_per = config_z['full_steps_per_rotation']
|
|
micro = config_z['microsteps']
|
|
if 'gear_ratio' in config_z.keys():
|
|
gear_ratio_conf = config_z['gear_ratio'].split(':')
|
|
gear_ratio = float(gear_ratio_conf[0])/float(gear_ratio_conf[1])
|
|
else:
|
|
gear_ratio = 1.
|
|
step_distance = (rot_dist / (micro * steps_per))/gear_ratio
|
|
elif 'step_distance' in config_z.keys():
|
|
step_distance = config_z['step_distance']
|
|
else:
|
|
step_distance = "NA"
|
|
max_z = config_z['position_max']
|
|
if 'second_homing_speed' in config_z.keys():
|
|
homing_speed = config_z['second_homing_speed']
|
|
else:
|
|
homing_speed = config_z['homing_speed']
|
|
|
|
# Frame expansion configuration
|
|
config_frame_comp = config['frame_expansion_compensation']
|
|
|
|
# Organize
|
|
meta = {
|
|
'user': {
|
|
'id': USER_ID,
|
|
'printer': PRINTER_MODEL,
|
|
'measure_type': MEASURE_TYPE,
|
|
'measure_gcode': '"%s"' % MEASURE_GCODE,
|
|
'notes': NOTES,
|
|
'timestamp': datetime.now().strftime(
|
|
"%Y-%m-%d_%H-%M-%S")
|
|
},
|
|
'script':{
|
|
'data_structure': 3,
|
|
'hot_duration': HOT_DURATION,
|
|
'cool_duration': COOL_DURATION
|
|
},
|
|
'z_axis': {
|
|
'step_dist' : step_distance,
|
|
'max_z' : max_z,
|
|
'homing_speed': homing_speed
|
|
},
|
|
'frame_expansion_compensation' : config_frame_comp,
|
|
}
|
|
return meta
|
|
|
|
def write_metadata(meta):
|
|
with open(DATA_FILENAME, 'w') as dataout:
|
|
dataout.write('### METADATA ###\n')
|
|
for section in meta.keys():
|
|
print(section)
|
|
dataout.write("## %s ##\n" % section.upper())
|
|
for item in meta[section]:
|
|
dataout.write('# %s=%s\n' % (item, meta[section][item]))
|
|
dataout.write('### METADATA END ###\n')
|
|
|
|
def send_gcode(cmd='', retries=1):
|
|
url = BASE_URL + "/printer/gcode/script?script=%s" % cmd
|
|
resp = post(url)
|
|
success = None
|
|
for i in range(retries):
|
|
try:
|
|
success = 'ok' in resp.json()['result']
|
|
except KeyError:
|
|
print("G-code command '%s', failed. Retry %i/%i" % (cmd, i+1, retries))
|
|
else:
|
|
return True
|
|
return False
|
|
|
|
def set_bedtemp(t=0):
|
|
temp_set = False
|
|
cmd = 'SET_HEATER_TEMPERATURE HEATER=heater_bed TARGET=%.1f' % t
|
|
temp_set = send_gcode(cmd, retries=3)
|
|
if not temp_set:
|
|
raise RuntimeError("Bed temp could not be set.")
|
|
|
|
|
|
def set_hetemp(t=0):
|
|
temp_set = False
|
|
cmd = 'SET_HEATER_TEMPERATURE HEATER=extruder TARGET=%.1f' % t
|
|
temp_set = send_gcode(cmd, retries=3)
|
|
if not temp_set:
|
|
raise RuntimeError("HE temp could not be set.")
|
|
|
|
def get_cached_gcode(n=1):
|
|
url = BASE_URL + "/server/gcode_store?count=%i" % n
|
|
resp = get(url).json()['result']['gcode_store']
|
|
return resp
|
|
|
|
def query_mcu_z_pos():
|
|
send_gcode(cmd='get_position')
|
|
gcode_cache = get_cached_gcode(n=1)
|
|
for msg in gcode_cache:
|
|
pos_matches = list(MCU_Z_POS_RE.finditer(msg['message']))
|
|
if len(pos_matches) > 1:
|
|
return int(pos_matches[0].group())
|
|
return None
|
|
|
|
def query_frame_temp():
|
|
url = BASE_URL + '/printer/objects/query?frame_expansion_compensation'
|
|
resp = get(url).json()['result']
|
|
temp = float(resp['status']['frame_expansion_compensation']['temperature'])
|
|
return temp
|
|
|
|
def query_temp_sensors():
|
|
url = BASE_URL + '/printer/objects/query?extruder&heater_bed&%s' % CHAMBER_CONFIG
|
|
resp = get(url).json()['result']['status']
|
|
try:
|
|
chamber_current = resp[CHAMBER_CONFIG]['temperature']
|
|
except KeyError:
|
|
chamber_current = -180.
|
|
bed_current = resp['heater_bed']['temperature']
|
|
bed_target = resp['heater_bed']['target']
|
|
he_current = resp['extruder']['temperature']
|
|
he_target = resp['extruder']['target']
|
|
return {
|
|
'chamber_temp': chamber_current,
|
|
'bed_temp': bed_current,
|
|
'bed_target': bed_target,
|
|
'he_temp': he_current,
|
|
'he_target': he_target}
|
|
|
|
def collect_datapoint(index):
|
|
if not send_gcode(MEASURE_GCODE):
|
|
set_bedtemp()
|
|
set_hetemp()
|
|
err = 'MEASURE_GCODE (%s) failed. Stopping.' % MEASURE_GCODE
|
|
raise RuntimeError(err)
|
|
stamp = datetime.now()
|
|
pos = query_mcu_z_pos()
|
|
t_frame = query_frame_temp()
|
|
t_sensors = query_temp_sensors()
|
|
datapoint = {
|
|
index : {
|
|
'timestamp': stamp,
|
|
'mcu_z' : pos,
|
|
'frame_t' : t_frame,
|
|
'chamber_t' : t_sensors['chamber_temp'],
|
|
'bed_t' : t_sensors['bed_temp'],
|
|
'bed_target' : t_sensors['bed_target'],
|
|
'he_t' : t_sensors['he_temp'],
|
|
'he_target' : t_sensors['he_target']}
|
|
}
|
|
return datapoint
|
|
|
|
def log_datapoint(datapoint:list):
|
|
_index = list(datapoint[0].keys())[0]
|
|
with open(DATA_FILENAME, 'a') as datafile:
|
|
for sample in datapoint:
|
|
datafile.write('\n%s,%s,%s,%s,%s,%s,%s,%s,%s'
|
|
% (
|
|
_index,
|
|
sample[_index]['timestamp'].strftime("%Y/%m/%d-%H:%M:%S"),
|
|
sample[_index]['mcu_z'],
|
|
sample[_index]['frame_t'],
|
|
sample[_index]['chamber_t'],
|
|
sample[_index]['bed_t'],
|
|
sample[_index]['bed_target'],
|
|
sample[_index]['he_t'],
|
|
sample[_index]['he_target']
|
|
))
|
|
|
|
def wait_for_bedtemp():
|
|
global start_time
|
|
at_temp = False
|
|
print('Heating started')
|
|
while(1):
|
|
temps = query_temp_sensors()
|
|
if temps['bed_temp'] >= BED_TEMPERATURE-0.5:
|
|
at_temp = True
|
|
break
|
|
measure()
|
|
start_time = datetime.now()
|
|
print('\nBed temp reached')
|
|
|
|
def measure():
|
|
global last_measurement, index, start_time
|
|
now = datetime.now()
|
|
if (now - last_measurement) >= timedelta(minutes=MEASURE_INTERVAL):
|
|
last_measurement = now
|
|
data = []
|
|
print('\r', ' '*50,end='\r')
|
|
print('Measuring (#%i)...' % index, end='',flush=True)
|
|
for n in range(N_SAMPLES):
|
|
print('%i/%i...' % (n+1, N_SAMPLES), end='', flush=True)
|
|
data.append(collect_datapoint(index))
|
|
index += 1
|
|
log_datapoint(data)
|
|
print('DONE', " "*20)
|
|
else:
|
|
t_minus = ((last_measurement + timedelta(minutes=MEASURE_INTERVAL))-now).seconds
|
|
if now >= start_time:
|
|
total_remaining = (start_time + timedelta(hours=HOT_DURATION+COOL_DURATION)-now).seconds/60
|
|
print('%imin remaining. ' % total_remaining, end='')
|
|
print('Next measurement in %02is' % t_minus, end='\r', flush=True)
|
|
|
|
def main():
|
|
global last_measurement, start_time
|
|
write_metadata(gather_metadata())
|
|
print("Starting!\nHoming...", end='')
|
|
# Home all
|
|
if send_gcode('G28'):
|
|
print("DONE")
|
|
else:
|
|
raise RuntimeError("Failed to home. Aborted.")
|
|
|
|
send_gcode('SET_FRAME_COMP enable=0')
|
|
|
|
with open(DATA_FILENAME, 'a') as datafile:
|
|
header = 'sample,time,mcu_pos_z,frame_t,chamber_t,bed_t,bed_target,he_t,he_target'
|
|
datafile.write(header)
|
|
|
|
set_bedtemp(BED_TEMPERATURE)
|
|
set_hetemp(HE_TEMPERATURE)
|
|
|
|
wait_for_bedtemp()
|
|
|
|
while(1):
|
|
now = datetime.now()
|
|
if (now - start_time) >= timedelta(hours=HOT_DURATION):
|
|
break
|
|
measure()
|
|
sleep(0.2)
|
|
print('Hot measurements complete!')
|
|
set_bedtemp()
|
|
|
|
start_time = datetime.now()
|
|
while(1):
|
|
now = datetime.now()
|
|
if (now - start_time) >= timedelta(hours=HOT_DURATION+COOL_DURATION):
|
|
break
|
|
measure()
|
|
sleep(0.2)
|
|
|
|
set_hetemp()
|
|
send_gcode('SET_FRAME_COMP enable=1')
|
|
print('Cooldown measurements complete!')
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
set_bedtemp()
|
|
set_hetemp()
|
|
send_gcode('SET_FRAME_COMP enable=1')
|
|
print("\nAborted by user!")
|