Source code for quail.fingerprint

#!/usr/bin/env python
from __future__ import division
from __future__ import print_function
from builtins import zip
from builtins import range
from builtins import object
import numpy as np
from scipy.spatial.distance import cdist
import warnings
from joblib import Parallel, delayed
import multiprocessing
from .egg import Egg
from .helpers import default_dist_funcs, parse_egg, shuffle_egg
from .analysis.analysis import _analyze_chunk
from .analysis.clustering import fingerprint_helper, _get_weights
from .distance import dist_funcs as builtin_dist_funcs

[docs]class Fingerprint(object): """ Class for the memory fingerprint A memory fingerprint can be defined as a subject's tendency to cluster their recall responses with respect to more than one stimulus feature dimensions. What is a 'stimulus feature dimension' you ask? It is simply an attribute of the stimulus, such as its color, category, spatial location etc. Parameters ---------- init : quail.Egg Data to initialize the fingerprint instance features : list Features to consider for fingerprint analyses, defaults to all. state : np.array The current fingerprint (an array of real numbers between 0 and 1, inclusive) initialized to all 0.5 n : int a counter specifying how many lists went into estimating the current fingerprint (initialize to 0) permute : bool A boolean flag specifying whether to use permutations to compute the fingerprint (default: True) dist_funcs : dict (optional) A dictionary of custom distance functions for stimulus features. Each key should be the name of a feature and each value should be an inline distance function (e.g. `dist_funcs['feature_n'] = lambda a, b: abs(a-b)`) meta : dict (optional) Meta data about the study (i.e. version, description, date, etc.) can be saved here. """
[docs] def __init__(self, init=None, features='all', state=None, n=0, permute=False, nperms=1000, parallel=False): self.history = [] if init is not None: data = _analyze_chunk(init, analysis=fingerprint_helper, analysis_type='fingerprint', pass_features=True, permute=permute, n_perms=nperms, parallel=parallel) self.state = np.mean(data, 0) self.features = data.columns.values.tolist() self.history.append(self.state) n+=1 else: self.state = None self.features = None self.n = n
def update(self, egg, permute=False, nperms=1000, parallel=False): """ In-place method that updates fingerprint with new data Parameters ---------- egg : quail.Egg Data to update fingerprint Returns ---------- None """ # increment n self.n+=1 next_weights = np.nanmean(_analyze_chunk(egg, analysis=fingerprint_helper, analysis_type='fingerprint', pass_features=True, permute=permute, n_perms=nperms, parallel=parallel).values, 0) if self.state is not None: # multiply states by n c = self.state*self.n # update state self.state = np.nansum(np.array([c, next_weights]), axis=0)/(self.n+1) else: self.state = next_weights # update the history self.history.append(next_weights) def get_features(self): return self.features
[docs]class OptimalPresenter(object): """ A class that reorders stimuli to optimize memory performance A memory fingerprint can be defined as a subject's tendency to cluster their recall responses with respect to more than one stimulus feature dimensions. What is a 'stimulus feature dimension' you ask? It is simply an attribute of the stimulus, such as its color, category, spatial location etc. Parameters ---------- init : quail.Egg Data to initialize the fingerprint instance features : list Features to consider for fingerprint analyses, defaults to all. state : np.array The current fingerprint (an array of real numbers between 0 and 1, inclusive) initialized to all 0.5 n : int a counter specifying how many lists went into estimating the current fingerprint (initialize to 0) permute : bool A boolean flag specifying whether to use permutations to compute the fingerprint (default: True) dist_funcs : dict (optional) A dictionary of custom distance functions for stimulus features. Each key should be the name of a feature and each value should be an inline distance function (e.g. `dist_funcs['feature_n'] = lambda a, b: abs(a-b)`) meta : dict (optional) Meta data about the study (i.e. version, description, date, etc.) can be saved here. """
[docs] def __init__(self, strategy='random', features=None, params=None, fingerprint=None): # set default params self.params = { 'alpha' : 4, 'tau' : 1, 'fingerprint' : Fingerprint() } # update with user defined params if params is not None: self.params.update(params) self.strategy = strategy
def set_params(self, name, value): """ Sets a parameter to a particular value """ self.params[name]=value def get_params(self, name): """ Sets a parameter to a particular value """ return self.params[name] def set_strategy(self, strategy='random'): """ Sets a reordering strategy """ self.strategy = strategy def order(self, egg, method='permute', nperms=2500, strategy=None, distfun='correlation', fingerprint=None): """ Reorders a list of stimuli to match a fingerprint Parameters ---------- egg : quail.Egg Data to compute fingerprint method : str Method to re-sort list. Can be 'stick' or 'permute' (default: permute) nperms : int Number of permutations to use. Only used if method='permute'. (default: 2500) strategy : str or None The strategy to use to reorder the list. This can be 'stabilize', 'destabilize', 'random' or None. If None, the self.strategy field will be used. (default: None) distfun : str or function The distance function to reorder the list fingerprint to the target fingerprint. Can be any distance function supported by scipy.spatial.distance.cdist. For more info, see: https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cdist.html (default: euclidean) fingerprint : quail.Fingerprint or np.array Fingerprint (or just the state of a fingerprint) to reorder by. If None, the list will be reordered according to the fingerprint attached to the presenter object. Returns ---------- egg : quail.Egg Egg re-sorted to match fingerprint """ def order_perm(self, egg, dist_dict, strategy, nperm, distperm, fingerprint): """ This function re-sorts a list by computing permutations of a given list and choosing the one that maximizes/minimizes variance. """ # parse egg pres, rec, features, dist_funcs = parse_egg(egg) # length of list pres_len = len(pres) weights = [] orders = [] for i in range(nperms): x = rand_perm(pres, features, dist_dict, dist_funcs) weights.append(x[0]) orders.append(x[1]) weights = np.array(weights) orders = np.array(orders) # find the closest (or farthest) if strategy=='stabilize': closest = orders[np.nanargmin(cdist(np.array(fingerprint, ndmin=2), weights, distperm)),:].astype(int).tolist() elif strategy=='destabilize': closest = orders[np.nanargmax(cdist(np.array(fingerprint, ndmin=2), weights, distperm)),:].astype(int).tolist() # return a re-sorted egg return Egg(pres=[list(pres[closest])], rec=[list(pres[closest])], features=[list(features[closest])]) def order_best_stick(self, egg, dist_dict, strategy, nperms, distfun, fingerprint): # parse egg pres, rec, features, dist_funcs = parse_egg(egg) results = Parallel(n_jobs=multiprocessing.cpu_count())( delayed(stick_perm)(self, egg, dist_dict, strategy) for i in range(nperms)) weights = np.array([x[0] for x in results]) orders = np.array([x[1] for x in results]) # find the closest (or farthest) if strategy=='stabilize': closest = orders[np.nanargmin(cdist(np.array(fingerprint, ndmin=2), weights, distfun)),:].astype(int).tolist() elif strategy=='destabilize': closest = orders[np.nanargmax(cdist(np.array(fingerprint, ndmin=2), weights, distfun)),:].astype(int).tolist() # return a re-sorted egg return Egg(pres=[list(pres[closest])], rec=[list(pres[closest])], features=[list(features[closest])], dist_funcs=dist_funcs) def order_best_choice(self, egg, dist_dict, nperms, distfun, fingerprint): # get strategy strategy = self.strategy # parse egg pres, rec, features, dist_funcs = parse_egg(egg) results = Parallel(n_jobs=multiprocessing.cpu_count())( delayed(choice_perm)(self, egg, dist_dict) for i in range(nperms)) weights = np.array([x[0] for x in results]) orders = np.array([x[1] for x in results]) # find the closest (or farthest) if strategy=='stabilize': closest = orders[np.nanargmin(cdist(np.array(fingerprint, ndmin=2), weights, distfun)),:].astype(int).tolist() elif strategy=='destabilize': closest = orders[np.nanargmax(cdist(np.array(fingerprint, ndmin=2), weights, distfun)),:].astype(int).tolist() # return a re-sorted egg return Egg(pres=[list(pres[closest])], rec=[list(pres[closest])], features=[list(features[closest])], dist_funcs=dist_funcs) # if strategy is not set explicitly, default to the class strategy if strategy is None: strategy = self.strategy dist_dict = compute_distances_dict(egg) if fingerprint is None: fingerprint = self.get_params('fingerprint').state elif isinstance(fingerprint, Fingerprint): fingerprint = fingerprint.state else: print('using custom fingerprint') if (strategy=='random') or (method=='random'): return shuffle_egg(egg) elif method=='permute': return order_perm(self, egg, dist_dict, strategy, nperms, distfun, fingerprint) # elif method=='stick': return order_stick(self, egg, dist_dict, strategy, fingerprint) # elif method=='best_stick': return order_best_stick(self, egg, dist_dict, strategy, nperms, distfun, fingerprint) # elif method=='best_choice': return order_best_choice(self, egg, dist_dict, nperms, fingerprint) #
def order_stick(presenter, egg, dist_dict, strategy, fingerprint): """ Reorders a list according to strategy """ def compute_feature_stick(features, weights, alpha): '''create a 'stick' of feature weights''' feature_stick = [] for f, w in zip(features, weights): feature_stick+=[f]*int(np.power(w,alpha)*100) return feature_stick def reorder_list(egg, feature_stick, dist_dict, tau): def compute_stimulus_stick(s, tau): '''create a 'stick' of feature weights''' feature_stick = [[weights[feature]]*round(weights[feature]**alpha)*100 for feature in w] return [item for sublist in feature_stick for item in sublist] # parse egg pres, rec, features, dist_funcs = parse_egg(egg) # turn pres and features into np arrays pres_arr = np.array(pres) features_arr = np.array(features) # starting with a random word reordered_list = [] reordered_features = [] # start with a random choice idx = np.random.choice(len(pres), 1)[0] # original inds inds = list(range(len(pres))) # keep track of the indices inds_used = [idx] # get the word current_word = pres[idx] # get the features dict current_features = features[idx] # append that word to the reordered list reordered_list.append(current_word) # append the features to the reordered list reordered_features.append(current_features) # loop over the word list for i in range(len(pres)-1): # sample from the stick feature_sample = feature_stick[np.random.choice(len(feature_stick), 1)[0]] # indices left inds_left = [ind for ind in inds if ind not in inds_used] # make a copy of the words filtering out the already used ones words_left = pres[inds_left] # get word distances for the word dists_left = np.array([dist_dict[current_word][word][feature_sample] for word in words_left]) # features left features_left = features[inds_left] # normalize distances dists_left_max = np.max(dists_left) if dists_left_max>0: dists_left_norm = dists_left/np.max(dists_left) else: dists_left_norm = dists_left # get the min dists_left_min = np.min(-dists_left_norm) # invert the word distances to turn distance->similarity dists_left_inv = - dists_left_norm - dists_left_min + .01 # create a word stick words_stick = [] for word, dist in zip(words_left, dists_left_inv): words_stick+=[word]*int(np.power(dist,tau)*100) next_word = np.random.choice(words_stick) next_word_idx = np.where(pres==next_word)[0] inds_used.append(next_word_idx) reordered_list.append(next_word) reordered_features.append(features[next_word_idx][0]) return Egg(pres=[reordered_list], rec=[reordered_list], features=[[reordered_features]], dist_funcs=dist_funcs) # parse egg pres, rec, features, dist_funcs = parse_egg(egg) # get params needed for list reordering features = presenter.get_params('fingerprint').get_features() alpha = presenter.get_params('alpha') tau = presenter.get_params('tau') weights = fingerprint # invert the weights if strategy is destabilize if strategy=='destabilize': weights = 1 - weights # compute feature stick feature_stick = compute_feature_stick(features, weights, alpha) # reorder list return reorder_list(egg, feature_stick, dist_dict, tau) def order_choice(presenter, egg, dist_dict, fingerprint): # get strategy strategy = presenter.strategy # get tau tau = presenter.get_params('tau') # get number of features nfeats = len(presenter.get_params('fingerprint').features) # parse egg pres, rec, features, dist_funcs = parse_egg(egg) # start with a random word idx = np.random.choice(len(pres), 1)[0] # original inds inds = list(range(len(pres))) # keep track of the indices inds_used = [idx] # get the word current_word = pres[idx] # get the features dict current_features = features[idx] # append that word to the reordered list reordered_list = [current_word] # append the features to the reordered list reordered_features = [current_features] # loop over the word list for i in range(len(pres)-1): # indices left inds_left = [ind for ind in inds if ind not in inds_used] # make a copy of the words filtering out the already used ones words_left = pres[inds_left] # features left features_left = features[inds_left] # get weights if each word was added idx=0 weights = np.zeros((len(words_left), nfeats)) for word, feat in zip(words_left, features_left): weights[idx,:]=compute_feature_weights_dict( list(pres), reordered_list+[word], reordered_features+[feat], dist_dict) idx+=1 # print(weights) # print(cdist(np.array(fingerprint, ndmin=2), weights, 'euclidean')) # pick the closest (or farthest) # if strategy is 'stabilize': # pick = np.argmin(cdist(np.array(fingerprint, ndmin=2), weights, 'euclidean')) stick = [] dist = cdist(np.array(fingerprint, ndmin=2), weights, 'correlation').reshape(len(words_left), 1) for idx, val in enumerate(dist): for i in range(int((val*tau)*100)): stick.append(idx) pick = np.random.choice(stick, 1)[0] # elif strategy is 'destabilize': # pick = np.argmin(cdist(np.array(fingerprint, ndmin=2), weights, 'euclidean')) # get the next word next_word = words_left[pick] # and the idx of the next word next_word_idx = np.where(pres==next_word)[0] # append it to the inds already used inds_used.append(next_word_idx) # update the list reordered_list.append(next_word) reordered_features.append(features[next_word_idx][0]) return Egg(pres=[reordered_list], rec=[reordered_list], features=[[reordered_features]], dist_funcs=dist_funcs) # function to run 1 perm for parallel list re-sorting function def rand_perm(pres, features, dist_dict, dist_funcs): # seed RNG np.random.seed() # shuffle inds idx = np.random.permutation(len(pres)) # shuffled pres pres_perm = list(pres[idx]) # shuffled features features_perm = list(features[idx]) # compute weights weights = compute_feature_weights_dict(pres_perm, pres_perm, features_perm, dist_dict) # save out the order orders = idx return weights, orders def stick_perm(presenter, egg, dist_dict, strategy): """Computes weights for one reordering using stick-breaking method""" # seed RNG np.random.seed() # unpack egg egg_pres, egg_rec, egg_features, egg_dist_funcs = parse_egg(egg) # reorder regg = order_stick(presenter, egg, dist_dict, strategy) # unpack regg regg_pres, regg_rec, regg_features, regg_dist_funcs = parse_egg(regg) # # get the order regg_pres = list(regg_pres) egg_pres = list(egg_pres) idx = [egg_pres.index(r) for r in regg_pres] # compute weights weights = compute_feature_weights_dict(list(regg_pres), list(regg_pres), list(regg_features), dist_dict) # save out the order orders = idx return weights, orders def choice_perm(presenter, egg, dist_dict): """ Reorder a list by iteratively selecting words that get closer to the target fingerprint """ # seed RNG np.random.seed() strategy = presenter.strategy # unpack egg egg_pres, egg_rec, egg_features, egg_dist_funcs = parse_egg(egg) # reorder regg = order_choice(presenter, egg, dist_dict) # unpack regg regg_pres, regg_rec, regg_features, regg_dist_funcs = parse_egg(regg) # get the order regg_pres = list(regg_pres) egg_pres = list(egg_pres) idx = [egg_pres.index(r) for r in regg_pres] # compute weights weights = compute_feature_weights_dict(list(regg_pres), list(regg_pres), list(regg_features), dist_dict) # save out the order orders = idx return weights, orders def compute_distances_dict(egg): """ Creates a nested dict of distances """ pres, rec, features, dist_funcs = parse_egg(egg) pres_list = list(pres) features_list = list(features) # initialize dist dict distances = {} # for each word in the list for idx1, item1 in enumerate(pres_list): distances[item1]={} # for each word in the list for idx2, item2 in enumerate(pres_list): distances[item1][item2]={} # for each feature in dist_funcs for feature in dist_funcs: distances[item1][item2][feature] = builtin_dist_funcs[dist_funcs[feature]](features_list[idx1][feature],features_list[idx2][feature]) return distances def compute_feature_weights_dict(pres_list, rec_list, feature_list, dist_dict): """ Compute clustering scores along a set of feature dimensions Parameters ---------- pres_list : list list of presented words rec_list : list list of recalled words feature_list : list list of feature dicts for presented words distances : dict dict of distance matrices for each feature Returns ---------- weights : list list of clustering scores for each feature dimension """ # initialize the weights object for just this list weights = {} for feature in feature_list[0]: weights[feature] = [] # return default list if there is not enough data to compute the fingerprint if len(rec_list) < 2: print('Not enough recalls to compute fingerprint, returning default fingerprint.. (everything is .5)') for feature in feature_list[0]: weights[feature] = .5 return [weights[key] for key in weights] # initialize past word list past_words = [] past_idxs = [] # loop over words for i in range(len(rec_list)-1): # grab current word c = rec_list[i] # grab the next word n = rec_list[i + 1] # if both recalled words are in the encoding list and haven't been recalled before if (c in pres_list and n in pres_list) and (c not in past_words and n not in past_words): # for each feature for feature in feature_list[0]: # get the distance vector for the current word # dists = [dist_dict[c][j][feature] for j in dist_dict[c]] # distance between current and next word c_dist = dist_dict[c][n][feature] # filter dists removing the words that have already been recalled # dists_filt = np.array([dist for idx, dist in enumerate(dists) if idx not in past_idxs]) dists_filt = [dist_dict[c][j][feature] for j in dist_dict[c] if j not in past_words] # get indices avg_rank = np.mean(np.where(np.sort(dists_filt)[::-1] == c_dist)[0]+1) # compute the weight weights[feature].append(avg_rank / len(dists_filt)) # keep track of what has been recalled already past_idxs.append(pres_list.index(c)) past_words.append(c) # average over the cluster scores for a particular dimension for feature in weights: with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) weights[feature] = np.nanmean(weights[feature]) return [weights[key] for key in weights]