Source code for peer.peer_func

#!/usr/bin/env python
"""
Functions to predict eye movements

Authors:
    - Jake Son, 2017-2018  (jake.son@childmind.org)  http://jakeson.me

"""

[docs]def scaffolding(): """ Creates the project folder and file hierarchy and returns pathnames Returns ------- _project_dir : string Pathname of the highest-level project directory _data_dir : string Pathname of the directory containing data _output_dir : string Pathname of the output directory _stimulus_path : string Pathname of the PEER calibration scan stimuli """ _project_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) _data_dir = os.path.abspath(os.path.join(_project_dir, 'data')) _stimulus_path = os.path.abspath(os.path.join(_project_dir, 'peer/stim_vals.csv')) if not os.path.exists(_data_dir): os.makedirs(_data_dir) print('\nThe "data" directory needs to contain at least one subdirectory that contains data before proceeding. Each ' 'subdirectory should contain at least one PEER file ' 'and one file used to estimate eye movements, such as an fMRI scan acquired during movie viewing or rest.\n') sys.exit() if not os.listdir(_data_dir): print('\nThe "data" directory is empty. The "data" directory needs to contain at least one subdirectory that ' 'contains data. Each subdirectory should contain at least two PEER files.\n') sys.exit() else: for dataset in [x for x in os.listdir(_data_dir) if not x.startswith('.')]: dataset_path = os.path.abspath(os.path.join(_data_dir, dataset)) if 'outputs' not in os.listdir(dataset_path): _dataset_dir = os.path.abspath(os.path.join(_data_dir, dataset)) _output_dir = os.path.abspath(os.path.join(_dataset_dir, 'outputs')) os.makedirs(_output_dir) else: _dataset_dir = os.path.abspath(os.path.join(_data_dir, dataset)) _output_dir = os.path.abspath(os.path.join(_dataset_dir, 'outputs')) return _project_dir, _data_dir, _output_dir, _stimulus_path
[docs]def set_parameters(_configs, new=False): """ Sets configuration parameters Parameters ---------- _configs : Dictionary containing configuration options from the config file (config.json) new : bool Do you want to start from a new file? Returns ------- _configs : Updated dictionary containing configuration options from the config file (config.json) """ if new: _configs = {x: "NA" for x in _configs} print('*Do not include single or double quotes*\n') if _configs['eye_mask_path'] == 'NA': _eye_mask_path = input('Add the full eye mask filepath: ') _configs['eye_mask_path'] = _eye_mask_path if _configs['train_file'] == 'NA': _train_file = input('Add the name of the file used for training [peer1.nii.gz]: ') if not _train_file: _configs['train_file'] = 'peer1.nii.gz' else: _configs['train_file'] = _train_file if _configs['test_file'] == 'NA': _test_file = input('Which file would you like to predict eye movements from? [movie.nii.gz]: ') if not _test_file: _configs['test_file'] = 'movie.nii.gz' else: _configs['test_file'] = _test_file if _configs['use_gsr'] == 'NA': _use_gsr = input('Use global signal regression? (y/n) [n]: ') if (not _use_gsr) or (_use_gsr == 'n'): _configs['use_gsr'] = "0" else: _configs['use_gsr'] = "1" if _configs['motion_scrub'] == 'NA': _use_ms = input('Use motion scrubbing? (y/n) [n]: ') if (not _use_ms) or (_use_ms == 'n'): _configs['use_ms'] = "0" _configs['motion_threshold'] = "0" _configs['motion_scrub'] = "Not implemented" elif _use_ms == 'y': _configs['use_ms'] = "1" _motion_scrub_filename = input('Add the filename of the CSV that contains the framewise displacement \ time series [motion_ts.csv]: ') if not _motion_scrub_filename: _configs['motion_scrub'] = 'motion_ts.csv' else: _configs['motion_scrub'] = _motion_scrub_filename _motion_threshold = input('Add a motion threshold for motion scrubbing [.2]: ') if not _motion_threshold: _configs['motion_threshold'] = ".2" else: _configs['motion_threshold'] = _motion_threshold with open('peer/config.json', 'w') as f: json.dump(_configs, f) return _configs
[docs]def load_config(): """ Loads configuration parameters as a dictionary from config.json Returns ------- _configs : Dictionary containing configuration options from the config file (config.json) """ with open('peer/config.json', 'r') as f: _configs = json.load(f) if any('NA' in x for x in list(_configs.values())): print('\nOne or more of the necessary parameters are missing. You can either:\n' ' 1) add the content using the command line, or\n' ' 2) exit and manually edit the config.json file\n') _configs = set_parameters(_configs, new=False) return _configs
[docs]def load_data(_filepath): """ Loads fMRI data Parameters ---------- _filepath : string Pathname of the NIfTI file used to train a model or predict eye movements Returns ------- _data : float 4D numpy array containing fMRI data """ nib_format = nib.load(_filepath) _data = nib_format.get_data() print('Training data Loaded') return _data
[docs]def global_signal_regression(_data, _eye_mask_path): """ Performs global signal regression Parameters ---------- _data : float Data from an fMRI scan as a 4D numpy array _eye_mask_path : Pathname for the eye mask NIfTI file (the standard MNI152 2mm FSL template is used for the linked preprint) Returns ------- _data : 4D numpy array containing fMRI data after global signal regression """ eye_mask = nib.load(_eye_mask_path).get_data() global_mask = np.array(eye_mask, dtype=bool) regressor_map = {'constant': np.ones((_data.shape[3], 1))} regressor_map['global'] = _data[global_mask].mean(0) X = np.zeros((_data.shape[3], 1)) csv_filename = '' for rname, rval in regressor_map.items(): X = np.hstack((X, rval.reshape(rval.shape[0], -1))) csv_filename += '_' + rname X = X[:, 1:] Y = _data[global_mask].T B = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y) Y_res = Y - X.dot(B) _data[global_mask] = Y_res.T print('GSR completed.') return _data
[docs]def motion_scrub(_ms_filename, _data_dir, _motion_threshold): """ Determines volumes with high motion artifact Parameters ---------- _ms_filename : string Pathname of the CSV file containing the framewise displacement per time point for a given fMRI scan _data_dir : string Pathname of the directory containing data _motion_threshold : float Threshold for high motion (framewise displacement, defined by Power et al. 2012) Returns ------- _removed_indices : int List of volumes to remove for motion scrubbing """ file_path = os.path.abspath(os.path.join(_data_dir, _ms_filename)) with open(file_path, 'r') as f: reader = csv.reader(f) censor_pre = [x for x in reader] nuissance_vector = [float(x) for x in censor_pre[0]] _removed_indices = [i for i, x in enumerate(nuissance_vector) if x >= float(_motion_threshold)] return _removed_indices
[docs]def prepare_data_for_svr(_data, _removed_time_points): """ Preprocess fMRI data prior to SVR model generation Parameters ---------- _data : float 4D numpy array containing fMRI data after global signal regression _removed_time_points : int List of volumes to remove for motion scrubbing Returns ------- _processed_data : float List of numpy arrays, where each array contains the averaged intensity values for each calibration point _calibration_points_removed : int List of calibration points removed if all volumes for a given calibration point were high motion """ if _removed_time_points is not None: print(str('The {}th volume(s) were removed.').format(_removed_time_points)) else: _removed_time_points = [] _processed_data = [] _calibration_points_removed = [] for num in range(int(_data.shape[3]/5)): vol_set = [x for x in np.arange(num * 5, (num + 1) * 5) if x not in _removed_time_points] if len(vol_set) != 0: _processed_data.append(np.average(_data[:, :, :, vol_set], axis=3).ravel()) else: _calibration_points_removed.append(num) if (_calibration_points_removed) and (_removed_time_points): print(str('The {}th calibration point(s) were removed.').format(_calibration_points_removed)) elif (not _calibration_points_removed) and (_removed_time_points): print(str('No calibration points were removed.')) return _processed_data, _calibration_points_removed
[docs]def train_model(_data, _calibration_points_removed, _stimulus_path): """ Trains the SVR model used in the PEER method Parameters ---------- _data : float List of numpy arrays, where each array contains the averaged intensity values for each calibration point _calibration_points_removed : int List of calibration points removed if all volumes for a given calibration point were high motion _stimulus_path : string Pathname of the PEER calibration scan stimuli Returns ------- _xmodel : SVR model to estimate eye movements in the x-direction _ymodel : SVR model to estimate eye movements in the y-direction """ monitor_width = 1680 monitor_height = 1050 fixations = pd.read_csv(_stimulus_path) x_targets = np.repeat(np.array(fixations['pos_x']), 1) * monitor_width / 2 y_targets = np.repeat(np.array(fixations['pos_y']), 1) * monitor_height / 2 x_targets = list(np.delete(np.array(x_targets), _calibration_points_removed)) y_targets = list(np.delete(np.array(y_targets), _calibration_points_removed)) _xmodel = SVR(kernel='linear', C=100, epsilon=.01) _xmodel.fit(_data, x_targets) _ymodel = SVR(kernel='linear', C=100, epsilon=.01) _ymodel.fit(_data, y_targets) return _xmodel, _ymodel
[docs]def save_model(_xmodel, _ymodel, _train_file, _ms, _gsr, _output_dir): """ Saves the SVR models used in the PEER method Parameters ---------- _xmodel : SVR model to estimate eye movements in the x-direction _ymodel : SVR model to estimate eye movements in the y-direction _train_file : string Pathname of the NIfTI file used to train the SVR model _ms : bool Whether or not to use motion scrubbing _gsr : bool Whether or not to use global signal regression _output_dir : Pathname of the output directory """ x_name = os.path.abspath(os.path.join(_output_dir, str('xmodel_' + _train_file.strip('.nii.gz') + '_ms' + _ms + '_gsr' + _gsr + '.pkl'))) y_name = os.path.abspath(os.path.join(_output_dir, str('ymodel_' + _train_file.strip('.nii.gz') + '_ms' + _ms + '_gsr' + _gsr + '.pkl'))) joblib.dump(_xmodel, x_name) joblib.dump(_ymodel, y_name) print('SVR Models saved. PEER can now be applied to new data.')
[docs]def load_model(_output_dir): """ Loads the SVR models used to estimate eye movements Parameters ---------- _output_dir : string Pathname of the output directory Returns ------- _xmodel : SVR model to estimate eye movements in the x-direction _ymodel : SVR model to estimate eye movements in the y-direction _xname : string Filename of the model used to estimate eye movements in the x-direction _yname : stringf Filename of the model used to estimate eye movements in the y-direction """ model_selection = [x for x in os.listdir(_output_dir) if ('pkl' in x) and x.startswith('xmodel')] if len(model_selection) > 1: options = [] options_index = list(np.arange(len(model_selection))) print("List of available models:\n") for i, model in enumerate(model_selection): model_option = str(' {}: {}').format(str(i), model.replace('xmodel_', '')) options.append(model.replace('xmodel', '')) print(model_option) print('\n') selected_model_index = int(input(str('Which model type? ({}): ').format(options_index))) selected_model = str('xmodel' + options[selected_model_index]) selected_model_path = os.path.abspath(os.path.join(_output_dir, selected_model)) _xname = selected_model.replace('pkl', 'csv') _yname = selected_model.replace('x', 'y').replace('pkl', 'csv') _xmodel = joblib.load(selected_model_path) _ymodel = joblib.load(selected_model_path.replace('x', 'y')) else: _xname = model_selection[0] _yname = model_selection[0].replace('x', 'y') x_selected_model_path = os.path.abspath(os.path.join(_output_dir, _xname)) y_selected_model_path = os.path.abspath(os.path.join(_output_dir, _yname)) _xmodel = joblib.load(x_selected_model_path) _ymodel = joblib.load(y_selected_model_path) _xname = model_selection[0].replace('pkl', 'csv') _yname = model_selection[0].replace('pkl', 'csv').replace('x', 'y') return _xmodel, _ymodel, _xname, _yname
[docs]def predict_fixations(_xmodel, _ymodel, _data): """ Predict fixations Parameters ---------- _xmodel : SVR model to estimate eye movements in the x-direction _ymodel : SVR model to estimate eye movements in the y-direction _data : 4D numpy array containing fMRI data used to predict eye movements (e.g., movie data) Returns ------- _x_fix : float List of predicted fixations in the x-direction _y_fix : float List of predicted fixations in the y-direction """ _x_fix = _xmodel.predict(_data) _y_fix = _ymodel.predict(_data) return _x_fix, _y_fix
[docs]def save_fixations(_x_fix, _y_fix, _xname, _yname, _output_dir): """ Save predicted fixations Parameters ---------- _x_fix : float List of predicted fixations in the x-direction _y_fix : float List of predicted fixations in the y-direction _xname : string Filename of the model used to estimate eye movements in the x-direction _yname : string Filename of the model used to estimate eye movements in the y-direction _output_dir : string Pathname of the output directory Returns ------- _fix_xname : string Filename of the CSV containing fixation predictions in the x-direction _fix_yname : string Filename of the CSV containing fixation predictions in the y-direction """ _fix_xname = str('xfixations_' + _xname) _fix_yname = str('yfixations_' + _yname) x_path = os.path.abspath(os.path.join(_output_dir, _fix_xname)) y_path = os.path.abspath(os.path.join(_output_dir, _fix_yname)) x = open(x_path, 'w') for fix in _x_fix: x.write(str("{0:.5f},").format(round(fix, 5))) x.close() y = open(y_path, 'w') for fix in _y_fix: y.write(str("{0:.5f},").format(round(fix, 5))) y.close() return _fix_xname, _fix_yname
[docs]def estimate_em(_x_fix, _y_fix, _fix_xname, _fix_yname, _output_dir): """ Parameters ---------- _x_fix : float List of predicted fixations in the x-direction _y_fix : float List of predicted fixations in the y-direction _fix_xname : string Filename of the CSV containing fixation predictions in the x-direction _fix_yname : string Filename of the CSV containing fixation predictions in the y-direction _output_dir : string Pathname of the output directory """ _em_xname = str('x_eyemove' + _fix_xname) _em_yname = str('y_eyemove' + _fix_yname) x_em = [] y_em = [] for num in range(len(_x_fix)-1): x_em.append(abs(_x_fix[num] - _x_fix[num+1])) y_em.append(abs(_y_fix[num] - _y_fix[num+1])) x_path = os.path.abspath(os.path.join(_output_dir, _em_xname)) y_path = os.path.abspath(os.path.join(_output_dir, _em_yname)) x = open(x_path, 'w') for fix in x_em: x.write(str("{0:.5f},").format(round(fix, 5))) x.close() y = open(y_path, 'w') for fix in y_em: y.write(str("{0:.5f},").format(round(fix, 5))) y.close()