Source code for firefly.data_reader.reader

import h5py
import os 
import shutil 
import subprocess
import pandas as pd
import requests
import numpy as np
import time

from .settings import Settings,default_settings
from .tween import TweenParams
from .particlegroup import ParticleGroup
from .json_utils import write_to_json,load_from_json

[docs]class Reader(object): """ This class provides a framework to unify the Settings and ParticleGroup classes to make sure that the user can easily produce firefly compatible files. You should use this Reader as a base class for any custom readers you may build (see :class:`firefly.data_reader.SimpleReader` or :class:`firefly.data_reader.FIREreader` for example). """
[docs] def __repr__(self): """Implementation of builtin function __repr__ :return: mystr, the pretty rendering of a reader :rtype: str """ my_str = str(self.__class__).split('.')[-1].replace("'>","") my_str += " with %d particle groups" % len(self.particleGroups) return my_str
[docs] def __init__( self, datadir=None, file_prefix='Data', clean_datadir=True, max_npart_per_file=10**5, write_startup='append', write_only_data=False, settings:Settings=None, tweenParams:TweenParams=None, **kwargs): """Base initialization method for Reader instances. A Reader will read data and produce firefly compatible :code:`.json` files. :param datadir: the sub-directory that will contain your JSON files, relative to your :code:`$HOME directory`. , defaults to :code:`$HOME/<file_prefix>` :type datadir: str, optional :param file_prefix: Prefix for any :code:`.json` files created, :code:`.json` files will be of the format: :code:`<file_prefix><parttype>_%d.json`, defaults to 'Data' :type file_prefix: str, optional :param clean_datadir: flag to delete all :code:`.json` files in the :code:`datadir`. Strictly not necessary (since :code:`filenames.json` will be updated) but it is good to clean up after yourself., defaults to False :type clean_datadir: bool, optional :param max_npart_per_file: the maximum number of particles saved per :code:`.json` file, don't use too large a number or you will have trouble loading the individual files in., defaults to 10**4 :type max_npart_per_file: int, optional :param write_startup: flag for how to treat the :code:`startup.json` file. Takes three values: :code:`'append'`: appends :code:`datadir` to :code:`startup.json`, :code:`True`: overwrites :code:`startup.json` with a single entry, :code:`datadir`, :code:`False`: does not alter :code:`startup.json`, , defaults to 'append' :type write_startup: str/bool, optional :param write_only_data: flag for whether writeToDisk should exclude Settings and tweenParams instances as well as the filenames.json and startup.json files. If True, then the reader will only export the raw data files (useful for not overwriting things or only loading single files at a time to append onto an existing dataset), defaults to False :type write_only_data: bool, optional :param settings: a :class:`firefly.data_reader.Settings` instance, defaults to a new default :class:`firefly.data_reader.Settings` instance. :type settings: :class:`firefly.data_reader.Settings`, optional :param tweenParams: :class:`firefly.data_reader.TweenParams` instance, defaults to None :type tweenParams: :class:`firefly.data_reader.TweenParams`, optional :raises TypeError: raised if anything other than a :class:`firefly.data_reader.Settings` instance is passed to :code:`settings` :raises TypeError: raised if anything other than a :class:`firefly.data_reader.TweenParams` instance is passed to :code:`tweenParams` """ ## where will firefly look for jsons ## we're in data_reader, so let's steal the ## path from there self.static_data_dir = os.path.abspath( os.path.join( os.path.realpath(__file__), '..', '..', 'static', 'data')) if datadir is None: ## default to saving directly into the firefly directory print("datadir is None, defaulting to %s/%s"%(self.static_data_dir,file_prefix)) datadir = os.path.join( self.static_data_dir, file_prefix) elif datadir[:1] != os.sep: ## datadir is a relative path. ## Let's assume they want to save w.r.t. their home directory? datadir = os.path.join(os.environ['HOME'],datadir) if datadir[-1:]==os.sep: ## get rid of the trailing '/' if it's there datadir=datadir[:-1] self.datadir = datadir ## determine whether the directory we're saving the ## actual data in is a sub-directory of firefly/static/data. ## if not, set self.needs_soft_link = True self.__splitAndValidateDatadir() ## how are we managing multiple datasets? self.write_startup = write_startup ## should we exclude settings, tween params, filenames.json, and startup.json when writing to disk? self.write_only_data = write_only_data #set the maximum number of particles per data file self.max_npart_per_file = max_npart_per_file ## file_prefix for the datafiles e.g. Data self.file_prefix = file_prefix #remove existing data files in the datadir before adding more? self.clean_datadir = clean_datadir ## array of particle groups self.particleGroups: list[ParticleGroup] = [] settings_kwargs,self.particlegroup_kwargs = split_kwargs(kwargs) if settings is not None: if settings.__class__.__name__ != 'Settings': ## fun fact, assert isinstance(settings,Settings) won't work with jupyter notebooks ## that use %load_ext autoreload raise TypeError("Make sure you use a Settings instance to specify firefly settings.") ## we'll use the default ones then else: settings = Settings(**settings_kwargs) self.settings_path = os.path.join( self.static_data_dir, os.path.basename(datadir), self.file_prefix+settings.settings_filename) if not self.clean_datadir and os.path.isfile(self.settings_path): print(f"Importing existing settings from {self.settings_path}") settings.loadFromJSON(self.settings_path,loud=False) self.settings = settings ## and apply the settings that were passed as keyword arguments for key,value in settings_kwargs.items(): self.settings[key] = value if tweenParams is not None: if tweenParams.__class__.__name__ != 'TweenParams': raise TypeError("Make sure you use a TweenParams instance to specify fly-through paths.") self.tweenParams:TweenParams = tweenParams
def __splitAndValidateDatadir(self,loud=True): """[summary] :param loud: flag to print status information to the console, defaults to False :type loud: bool, optional :return: [description] :rtype: [type] """ """ Ensures that files will be output to a location that firefly can read, as well as splits the path so that filenames.json references files correctly. """ ## split the JSON directory path into the folder ## that will actually contain the JSON files and the path leading ## to it. hard_data_path,short_data_path = os.path.split(self.datadir) ## determine if we will need to soft link this data or if ## it was saved to some kind of self.static_data_dir (whether this one ## or another). self.needs_soft_link = False for validate in ['index.html','static']: if validate not in os.listdir( os.path.join(hard_data_path,"..","..")): if loud: print( "datadir: {} -- ".format(self.datadir)+ "is not a sub-directory of firefly/static/data. "+ "\nThis may produce confusing or inoperable results. "+ "As such, we will create a symlink for you when you "+ "writeToDisk.") self.needs_soft_link = True break return hard_data_path,short_data_path
[docs] def addParticleGroup(self,particleGroup): """ Track a new :class:`~firefly.data_reader.ParticleGroup` instance in this :class:`firefly.data_reader.Reader` instance's :code:`particleGroups` array and to the attached :class:`firefly.data_reader.Settings` instance. :param particleGroup: a :class:`~firefly.data_reader.ParticleGroup` instance that contains particle data for an individual UI element. :type particleGroup: :class:`firefly.data_reader.ParticleGroup` """ ## data validation of new ParticleGroup happened in its initialization self.particleGroups = np.append( self.particleGroups, [particleGroup],axis=0) ## add this particle group to the reader's settings file self.settings.attachSettings(particleGroup)
def createOctrees(self,mask=None,max_npart_per_node=10**4,nrecurse=10,**kwargs): ## validate mask length entry , each particle group ## should have a true or false entry if mask is not None and len(mask) != len(self.particleGroups): raise ValueError( f"mask must be of length: {len(self.particleGroups):d}"+ f" not length {len(mask):d}.") for i in range(len(self.particleGroups)): if mask is None or mask[i]: ## the old particle group will get garbage collected... eventually?? self.particleGroups[i] = self.particleGroups[i].spawn_octree_pg( self.datadir, max_npart_per_node, nrecurse=nrecurse, build=True, **kwargs)
[docs] def writeToDisk( self, loud=False, write_to_disk=True, symlink=True, file_extension='.ffly', **kwargs): """Creates all the necessary JSON files to run firefly and ensures they are properly linked and cross-referenced correctly using the :func:`firefly.data_reader.Settings.outputToJSON` and :func:`firefly.data_reader.ParticleGroup.outputToJSON` methods (and :func:`firefly.data_reader.TweenParams.outputToJSON` if one is attached). :param loud: flag to print status information to the console, defaults to False :type loud: bool, optional :param write_to_disk: flag that controls whether data is saved to disk (:code:`True`) or only converted to a string and stored in :code:`self.JSON` (:code:`False`), defaults to True :type write_to_disk: bool, optional :param symlink: flag for whether a soft link should be created between where the data is stored on disk and the :code:`self.static_data_dir` directory (:code:`True`) or whether it should be saved directly to :code:`self.static_data_dir` directory (:code:`False`). Note that :code:`symlink=False` will not _also_ save results in :code:`self.datadir`, defaults to True :type symlink: bool, optional :param file_extension: File extension for data files created, one of `.ffly` (binary) or `.json` (ASCII). :type file_extension: str, optional :return: :code:`self.JSON` or :code:`""` according to :code:`write_to_disk` :rtype: str """ ## path where data needs to be visible to firefly static_data_path = os.path.join( self.static_data_dir, os.path.basename(self.datadir)) ## where to put hard JSON files, if no symlink then we will ## need to save directly to static_data_path datadir = self.datadir if symlink else static_data_path if not symlink: ## we've been asked to ignore the request to put the ## hard files in self.datadir and instead actually put them in ## firefly/static/data/<short_data_path> if loud: print("Outputting files to: %s instead of: %s as originally specified."%( static_data_path, self.datadir)) ## if we don't actually want to write to disk then we don't need ## to do any filesystem tasks if write_to_disk: ## make the output directory if not os.path.isdir(datadir): os.makedirs(datadir) ## soft link between the "hard" data and firefly/static/data ## firefly can be run locally if self.needs_soft_link and symlink: try: ## create a symlink so that data can ## be read from a "sub-directory" os.symlink(datadir,static_data_path) except FileExistsError: if os.path.islink(static_data_path): ## remove the existing symlink os.unlink(static_data_path) elif os.path.isdir(static_data_path): shutil.rmtree(static_data_path) ## create a symlink so that data can ## be read from a "sub-directory" os.symlink(datadir,static_data_path) ## initialize an output array to contain all the jsons and their names file_array = [] ## write each particleGroup to JSON using their own method ## and save the filenames into a dictionary for filenames.json manifest_file = os.path.join(datadir,'filenames.json') if not self.clean_datadir and os.path.isfile(manifest_file): filenamesDict = load_from_json(manifest_file) else: filenamesDict = {} for particleGroup in self.particleGroups: print(particleGroup) if loud: print("Outputting:",particleGroup) this_file_array,filenames_and_nparts = particleGroup.writeToDisk( datadir, file_prefix=self.file_prefix, loud=loud, max_npart_per_file=self.max_npart_per_file, clean_datadir=self.clean_datadir if particleGroup is self.particleGroups[0] else False, not_reader=False, file_extension=file_extension, write_to_disk=write_to_disk, **kwargs) ## append the JSON arrays for this particle group file_array += this_file_array filenamesDict[particleGroup.UIname]=list(filenames_and_nparts) ## output the settings.json file if not self.write_only_data: file_array +=[self.dumpSettingsToJSON(symlink,write_to_disk,loud)] ## format and output the filenames.json file filenamesDict['options'] = [(os.path.join( os.path.basename(datadir), self.file_prefix+self.settings.settings_filename),0)] ## write a tweenParams file if a TweenParams instance is attached to reader if hasattr(self,'tweenParams') and self.tweenParams is not None: filenamesDict['tweenParams'] = [(os.path.join( os.path.basename(datadir), self.tweenParams.filename),0)] file_array+=[self.tweenParams.outputToJSON( datadir, file_prefix=self.file_prefix, loud=loud, write_to_disk=write_to_disk, not_reader=False)] ## None -> returns JSON string file_array +=[( manifest_file, write_to_json( filenamesDict, manifest_file if write_to_disk else None))] ## None -> returns JSON string ## handle the startup.json file, may need to append or totally overwrite startup_file = os.path.join( self.static_data_dir, 'startup.json') ## relative path from .js interpreter (which runs in /firefly/static) ## to this dataset startup_path = os.path.join("data",os.path.basename(datadir)) if self.write_startup == 'append' and os.path.isfile(startup_file): startup_dict = load_from_json(startup_file) maxx = 0 for key in startup_dict.keys(): if int(key) > maxx: maxx = int(key) ## it's already in startup.json if startup_dict[key] == startup_path: startup_file = None maxx-=1 ## since we'll add 1 below startup_dict[str(maxx+1)]=startup_path file_array+=[( ## recreate in case we overwrote the startup_file variable in loop above os.path.join(self.static_data_dir,'startup.json'), write_to_json( startup_dict, startup_file if write_to_disk else None))] ## None -> returns JSON string elif self.write_startup: file_array+=[( startup_file, write_to_json( {"0":startup_path}, startup_file if write_to_disk else None))] ## None -> returns JSON string if not write_to_disk and file_extension.lower() == '.json': ## create a single "big JSON" with all the data in it in case ## we want to send dataViaFlask JSON_dict = dict(file_array) self.JSON = write_to_json(JSON_dict,None) self.JSON_dict = JSON_dict else: ## we wrote all the little JSONs to disk individually already ## so we won't store a single big JSON ## since we only have None's in there anyway self.JSON = "" return self.JSON
def dumpSettingsToJSON( self, symlink=True, write_to_disk=True, loud=False): ## path where data needs to be visible to firefly static_data_path = os.path.join( self.static_data_dir, os.path.basename(self.datadir)) ## where to put hard JSON files, if no symlink then we will ## need to save directly to static_data_path datadir = self.datadir if symlink else static_data_path return self.settings.outputToJSON( datadir, file_prefix=self.file_prefix, loud=loud, write_to_disk=write_to_disk, not_reader=False)
[docs] def outputToDict(self): """ Formats the data in the reader to a python dictionary using the attached :func:`firefly.data_reader.Settings.outputToDict` and :func:`firefly.data_reader.ParticleGroup.outputToDict` methods (and :func:`firefly.data_reader.TweenParams.outputToDict` if one is attached). :return: :code:`outputDict`, a dictionary structured like the javascript object in the firefly webapp. Can be sent to the js interpreter via Flask using the :func:`firefly.data_reader.Reader.sendDataViaFlask` method. :rtype: dict """ outputDict = {} outputDict['parts'] = {} ## create each particleGroup's dictionary using their own method for particleGroup in self.particleGroups: outputDict['parts'][particleGroup.UIname] = particleGroup.outputToDict() ## store the settings file in the output dictionary outputDict['options'] = self.settings.outputToDict() return outputDict
[docs] def sendDataViaFlask(self,port=5500,room=None): """ Exports the data as if it were being dumped to disk but instead stores it as a string. Then feeds this string to the js interpreter via Flask. :param port: port that the firefly Flask server is being hosted on, defaults to 5500 :type port: int, optional :param room: the name of the flask session to send data to on the specified port. defaults to None :type room: str, optional :raises ValueError: if `room` is not specified. """ ## retrieve a single "big JSON" of all the mini-JSON sub-files. self.writeToDisk( loud=False, write_to_disk=False, file_extension='.json') # we do not need to raise an error anymore, now that we have a default room # if room is None: raise ValueError( # f"Enter a valid room on port: {int(port):d} using the room (keyword) argument.") if (room is not None): self.JSON_dict['room'] = room ## None as the second argument (filename) -> converts dictionary ## to JSON string rather than writing to a file self.JSON = write_to_json(self.JSON_dict,None) ## post the json to the listening url data_input ## defined in if (room is None): print(f"Posting data on port {int(port):d}...",end='') else: print(f"Posting data to room {room} on port {int(port):d}...",end='') f'http://localhost:{port:d}/data_input', json=self.JSON) print("data posted!")
[docs] def copyFireflySourceToTarget( self, target=None, flask_templates=False, dump_data=True, overwrite=True, init_gh_pages=False, GHREPONAME=None, GHUSER=None, GHOAUTHTOKENPATH=None, **kwargs): """ Copies the necessary source files to run a stand-alone instance of firefly on the web. Optionally, will also initialize a new GitHub repository with GitHub pages, a free web-hosting service, so that this stand-alone instance can be accessed by anyone over the internet. :param target: target directory to save firefly source files to, defaults to :code:`$HOME/my_Firefly` :type target: str, optional :param flask_templates: flag for whether the flask template files should also be copied. In general, these files are not required to run firefly over the internet but may be useful if one intends to run firefly locally in this new directory, defaults to False :type flask_templates: bool, optional :param dump_data: flag for whether the data stored in this reader should also be saved to this new stand-alone firefly directory (vs. only the firefly source files), defaults to True :type dump_data: bool, optional :param overwrite: flag for whether the existing target static directory should be purged before anything is copied over or written to disk, defaults to True :type overwrite: bool, optional :param init_gh_pages: flag to run :code:`firefly/bin/` in an attempt to initialize a new github repository with GitHub Pages, a free web-hosting service provided by GitHub, enabled, defaults to False :type init_gh_pages: bool, optional :param GHREPONAME: repository name that we should attempt to create (note that a non-critical error will be raised if the repo already exists), defaults to the last sub-directory of :code:`target` :type GHREPONAME: str, optional :param GHUSER: GitHub username, defaults to :code:`$USER` :type GHUSER: str, optional :param GHOAUTHTOKENPATH: filepath to a file containing only the OAUTH token generated at:, defaults to :code:`$HOME/.github.token` :type GHOAUTHTOKENPATH: str, optional :raises FileNotFoundError: if :code:`GHOAUTHTOKENPATH` cannot be resolved :raises FileNotFoundError: if :code:`firefly/bin/` cannot be found. :return: returns a list of strings, :code:`[target]` if :code:`init_git_pages=False` otherwise the output of running :code:`firefly/bin/`. :rtype: list of str """ ## handle default argument(s) if target is None: target = os.path.join(os.environ['HOME'],'my_Firefly') elif target[:1] != os.sep: target = os.path.join(os.environ['HOME'],target) if not os.path.isdir(target): ## create the directory because it doesn't exist os.makedirs(target) elif overwrite: ## purge the old static directory shutil.rmtree(os.path.join(target,'static')) ## identify source directory src = os.path.abspath(os.path.join( os.path.dirname(__file__), "..")) ## copy the index.html file shutil.copy( os.path.join(src,'index.html'), target) ## copy the flask templates if requested ## in case someone wants to run flask from ## this directory rather than "just" host on ## the web if flask_templates: shutil.copytree( os.path.join(src,'templates'), os.path.join(target,'templates')) for obj in os.listdir(os.path.join(src,'static')): this_target = os.path.join(target,'static',obj) if obj == 'data': ## make an empty data directory rather than copy ## whatever jsons happen to be there if not os.path.isdir(this_target): os.mkdir(this_target) else: ## copy the source files if os.path.exists(this_target): shutil.rmtree(this_target) shutil.copytree(os.path.join(src,'static',obj),this_target) if dump_data: try: ## stash the old stat_data_dir old = self.static_data_dir ## dump to this dummy target self.static_data_dir = os.path.join(target,'static','data') if not os.path.isdir(self.static_data_dir): os.makedirs(self.static_data_dir) self.writeToDisk(symlink=False,**kwargs) except: raise finally: ## replace the old stat_data_dir self.static_data_dir = old ## attempts to initialize a github pages site in order to ## host this copied version of firefly on the web. if not init_gh_pages: return [target] else: ## default GHREPONAME to the directory name of the target if GHREPONAME is None: GHREPONAME = os.path.split(target)[-1] ## default GHOAUTHOTOKENPATH to ~/.github.token (my arbitrary choice) if GHOAUTHTOKENPATH is None: GHOAUTHTOKENPATH = '.github.token' ## if we were passed a relative path prepend the $HOME path if GHOAUTHTOKENPATH[:1] != os.sep: GHOAUTHTOKENPATH = os.path.join( os.environ['HOME'], GHOAUTHTOKENPATH) ## default to the current user name if GHUSER is None: GHUSER = os.environ['USER'] print(f"GHUSER not provided, defaulting to {GHUSER}") print('Waiting for 3 seconds in case you want to interrupt this',end='') for i in range(3): print('.',end='') time.sleep(1) print() if not os.path.isfile(GHOAUTHTOKENPATH): raise FileNotFoundError(f"No OAUTH token file matching {GHOAUTHTOKENPATH}, generate one from"+ " and write it to disk somewhere.") print(f"Initializing a new GitHub repository at {target} with\n" + f"\tGHREPONAME: {GHREPONAME}\n" + f"\tGHUSER: {GHUSER}\n" + f"\tGHOAUTHTOKENPATH: {GHOAUTHTOKENPATH}\n") ## check if the executable exists executable = os.path.abspath( os.path.join( self.static_data_dir, '..','..','bin','')) if not os.path.isfile(executable): raise FileNotFoundError("Missing executable, cannot initialize a new repository.") ## stash the current directory old = os.getcwd() try: ## move to the target directory os.chdir(target) ## intialize the github repo lines = subprocess.check_output( ["bash",executable,GHREPONAME,GHUSER,GHOAUTHTOKENPATH]) lines = lines.decode("utf-8").split("\n") return lines except: ## raise any error that might've come up raise finally: ## move back to the current directory os.chdir(old)
[docs]class ArrayReader(Reader): """A wrapper to :class:`firefly.data_reader.Reader` that stores raw numpy array data without opening anything from disk. """
[docs] def __init__( self, coordinates, velocities=None, UInames=None, fields=None, field_names=None, decimation_factor=1, write_to_disk=True, loud=True, **kwargs): """Takes a list of opened numpy arrays and creates a :class:`firefly.data_reader.Reader` instance with their data. Takes :class:`firefly.data_reader.Reader` passthrough kwargs. :param coordinates: raw coordinate data, ignores path_to_data if passed. Can either pass N,3 np.array which is interpreted as a single particle group's coordinates or a jagged (M,N_m,3) list of np.arrays which is interpreted as M many particle groups with each having N_m particles :type coordinates: list, optional :param velocities: raw velocity data, Can either pass N,3 np.array which is interpreted as a single particle group's velocities or a jagged (M,N_m,3) list of np.arrays which is interpreted as M many particle groups with each having N_m particles :type velocities: list, optional :param UInames: list of particle group UInames :type UInames: list of str :param fields: list of field arrays corresponding to each point of the coordinate data, Can pass: length N np.array which is interpreted as a single particle group's field, (N_m,N_f) list which is interpreted as a single particle group's fields, or a jagged (M,N_m,N_f) list of np.arrays which is interpreted as M many particle groups with each having N_m particles with N_f fields, defaults to [] :type fields: list of shape (M,N_f,N_m) :param field_names: strings to name fields by, defaults to ['field%d' for field in fields] :type field_names: list of strs :param decimation_factor: factor by which to reduce the data randomly i.e. :code:`data=data[::decimation_factor]`, defaults to 1 :type decimation_factor: int, optional :param write_to_disk: flag that controls whether data is saved to disk (:code:`True`) or only converted to a string and stored in :code:`self.JSON` (:code:`False`), defaults to True :type write_to_disk: bool, optional :param loud: flag to print status information to the console, defaults to False :type loud: bool, optional :raises np.AxisError: if the coordinate data cannot be interpreted :raises ValueError: if the number of particle groups does not match the number of coordinate arrays :raises np.AxisError: if the field data cannot be interpreted :raises np.AxisError: if the field names cannot be interpreted """ super().__init__(**kwargs) ## wrap coordinate array if necessary if len(np.shape(coordinates))==2 and np.shape(coordinates)[-1]==3: ## passed a single list of coordinates, prepend an axis for the single group coordinates = [coordinates] velocities = [velocities] if UInames is not None: UInames = [UInames] if fields is not None: fields = [fields] elif len(np.shape(coordinates[0]))==2 and np.shape(coordinates[0])[-1]==3: ## passed a jagged array of different coordinates pass else: raise np.AxisError("Uninterpretable coordinate array, either pass a single (N,3) array"+ " or a jagged list of 'shape' (M,N_m,3)") ngroups = len(coordinates) npartss = np.array([len(coords) for coords in coordinates]) ## check fields and wrap a single field for a single particle group fielderror = np.AxisError("Uninterpretable field array, either pass a single N array" " or a jagged list of 'shape' (M,N_fm,N_pm)") if fields is not None: ## special case and want to allow convenient/inconsistent syntax, ## so let's just handle it independently if ngroups == 1: if type(fields) != dict and type(fields[0]) != dict: try: fields = np.reshape(fields,(1,-1,npartss[0])) except: raise fielderror else: ## is everything square? unlikely but you know one can dream try: fields = np.reshape(fields,(ngroups,-1,npartss[0])) except: ## yeah okay i didn't think that was gonna work anyway ## let's build up a jagged list temp_fields = [] ## first check that we have enough fields for each group if len(fields) != ngroups: raise fielderror ## okay, we've got something to work with try: ## this should work *if* users passed correctly formatted input for igroup in range(ngroups): this_fields = fields[igroup] ## if this_fields is a dictionary it will be understood by ParticleGroup below if type(this_fields) != dict: np.reshape(this_fields,(-1,npartss[igroup])) temp_fields.append(this_fields) ## go read the documentation! except: raise fielderror ## swap the variable names fields = temp_fields nfieldss = [len(this_fields) for this_fields in fields] ## check field names and generate them if necessary fieldnameerror = np.AxisError("Uninterpretable field array, either pass a single N array"+ " or a jagged list of 'shape' (M,N_fm,N_pm)") if field_names is not None: ## special case and want to allow convenient/inconsistent syntax, ## so let's just handle it independently if ngroups == 1: try: field_names = np.array(field_names).reshape(ngroups,nfieldss[0]).tolist() except: raise fieldnameerror else: for igroup in range(ngroups): if len(field_names[igroup]) != nfieldss[igroup]: raise fieldnameerror ## so some fields were passed but not field_names elif fields is not None: field_names = [] for igroup in range(ngroups): ## field names will be extracted from dictionary in ParticleGroup initialization these_names = (["field%d"%j for j in range(nfieldss[igroup])] if type(fields[0]) != dict else None) field_names.append(these_names) ## initialize default particle group names if UInames is None: UInames = ['PGroup_%d'%i for i in range(len(coordinates))] elif len(UInames) != len(coordinates): raise ValueError("%d UInames does not match passed"%len(UInames)+ " number of %d coordinate arrays"%(len(coordinates))) ## loop through each of the particle groups for i,(coords,UIname) in enumerate(zip(coordinates,UInames)): ## initialize a firefly particle group instance firefly_particleGroup = ParticleGroup( UIname, coords, velocities[i], decimation_factor=decimation_factor, field_arrays=None if fields is None else fields[i], field_names=None if field_names is None else field_names[i], **self.particlegroup_kwargs) ## attach the instance to the reader self.addParticleGroup(firefly_particleGroup) ## either write a bunch of mini JSONs to disk or store ## a single big JSON in self.JSON self.writeToDisk( write_to_disk=write_to_disk, loud=loud and write_to_disk)
[docs]class SimpleReader(ArrayReader): """ A wrapper to :class:`firefly.data_reader.ArrayReader` that attempts to flexibily open generically formatetd data with minimal interaction from the user. """
[docs] def __init__( self, path_to_data, field_names=None, write_to_disk=True, extension='.hdf5', loud=True, csv_sep=',', **kwargs): """A simple reader that will take as minimal input the path to a (set of) .hdf5 file(s) and extract each top level group's 'Coordinates' or 'x','y','z' values. Coordinate data must saved either as an (N,3) array or in 3 separate (N,) arrays indexed by x,y, and z. Keyword arguments are passed to the parent :class:`~firefly.data_reader.Reader`. :param path_to_data: path to .hdf5/csv file(s; can be a directory) :type path_to_data: str :param field_names: strings to try and extract from .hdf5 file. If file format is .csv then there must be a header row :type field_names: list of strs :param extension: file extension to attempt to open. Accepts only :code:`'.hdf5'` or :code:`'.csv'`, defaults to '.hdf5' :type extension: str, optional :param loud: flag to print status information to the console, defaults to False :type loud: bool, optional :param csv_sep: separator between values in the csv, defaults to ',' :type csv_sep: str, optional :raises ValueError: if :code:`path_to_data` is not a directory or doesn't contain :code:`extension` :raises ValueError: if :code:`extension` is passed anything either than :code:`'.hdf5'` or :code:`'.csv'` :raises ValueError: if particle_groups is not None, extension = .csv, and len(particle_groups) != len(detected filenames in path_to_data) """ if extension in path_to_data: ## path_to_data points directly to a single data file fnames = [path_to_data] elif os.path.isdir(path_to_data): ## path_to_data points to a directory containing .hdf5 data files fnames = [] ## gather the individual filenames for this_fname in os.listdir(path_to_data): if extension in this_fname: fnames += [os.path.join(path_to_data,this_fname)] else: ## couldn't interpret what we were given raise ValueError( "%s needs to point to an %s file or "%(path_to_data,extension)+ "a directory containing %s files."%extension) if extension == '.hdf5': ## take the contents of the "first" file to define particle groups and keys with h5py.File(fnames[0],'r') as handle: particle_groups = list(handle.keys()) ## Gadget data has a header as well as particle groups ## so we need to ignore it if 'Header' in particle_groups: particle_groups.pop(particle_groups.index("Header")) elif extension == '.csv': particle_groups = [fname.replace(extension,'').split(os.sep)[-1] for fname in fnames] else: raise ValueError("Invalid extension %s, must be .hdf5 or .csv"%extension) print("Opening %d files and %d particle types..."%(len(fnames),len(particle_groups))) coordss = [] ## initialize bucket for holding field data fieldsss = None if field_names is None else [] for i,particle_group in enumerate(particle_groups): coordinates = None fieldss = {} if extension == '.hdf5': ## need to loop through each file and get the matching coordinates, ## concatenating them. for fname in fnames: coordinates = self.__getHDF5Coordinates(fname,particle_group,coordinates) fieldss = self.__getHDF5Fields(fname,particle_group,field_names,fieldss) elif extension == '.csv': ## each file corresponds to a single set of coordinates and fields coordinates = self.__getCSVCoordinates(fnames[i],csv_sep) fieldss = self.__getCSVFields(fnames[i],field_names,csv_sep) else: raise ValueError("Invalid extension %s, must be .hdf5 or .csv"%extension) coordss.append(coordinates) if fieldsss is not None: fieldsss.append(fieldss) ## create a default reader instance super().__init__( coordinates=coordss, UInames=particle_groups, fields=fieldsss, write_to_disk=write_to_disk, loud=loud, **kwargs)
def __getCSVCoordinates(self,fname,csv_sep=','): """Simple parser for opening a CSV file and extracting its coordinates. :param fname: full filepath to :code:`.csv` file :type fname: str :param csv_sep: separator between values in the csv, defaults to ',' :type csv_sep: str, optional :return: :code:`coordinates` :rtype: np.ndarray """ full_df = pd.read_csv(fname,sep=csv_sep) coordinates = np.empty((full_df.shape[0],3)) if 'x' in full_df and 'y' in full_df and 'z' in full_df: coordinates[:,0] = full_df['x'] coordinates[:,1] = full_df['y'] coordinates[:,2] = full_df['z'] else: coordinates[:,0] = full_df.iloc[:,0] coordinates[:,1] = full_df.iloc[:,1] coordinates[:,2] = full_df.iloc[:,2] return coordinates def __getCSVFields(self,fname,field_names,csv_sep=','): """Use pandas to interpret csv data in order to load field data matching field names (field names that are not present are ignored). :param fname: full filepath to :code:`.csv` file :type fname: str :param field_names: strings to try and extract from .csv. There must be a header row :type field_names: list of strs :param csv_sep: separator between values in the csv, defaults to ',' :type csv_sep: str, optional :return: dictionary of fields and field name pairings :rtype: dict if field_names is not None else None """ if field_names is None: return fieldss = {} ## trust pandas to open the .csv and detect if there's a header \_(ツ)_/ full_df = pd.read_csv(fname,sep=csv_sep) for field_name in field_names: if field_name in full_df: fieldss[field_name] = full_df[field_name].to_numpy() return fieldss def __getHDF5Coordinates( self, fname, particle_group=None, coordinates=None): """Simple parser for opening an hdf5 file and extracting its coordinates. :param fname: full filepath to :code:`.hdf5` file :type fname: str :param particle_group: :code:`group_name` in the :code:`.hdf5` file that contains this particle type's coordinates. If :code:`None` then it is assumed the Coordinates dataset is saved at the top level of the file, defaults to None :type particle_group: str :param coordinates: existing coordinate array that should be appended to, defaults to None :type coordinates: np.ndarray, optional :raises TypeError: if :code:`coordinates` is not of type :code:`np.ndarray` :raises np.AxisError: if :code:`coordinates` does not have shape (N,3) :return: coordinates, the opened coordinate array from :code:`fname` :rtype: np.ndarray """ with h5py.File(fname,'r') as handle: ## valide the existing coordinate array or initialize it in the first place if coordinates is None: coordinates = np.empty((0,3)) elif type(coordinates)!= np.ndarray: raise TypeError("Existing coordinate array must be of type np.ndarry") if np.shape(coordinates)[-1] != 3: raise np.AxisError("Last axis of existing coordinate array must be of size 3") ## open the hdf5 group if particle_group is not None: h5_group = handle[particle_group] else: h5_group = handle if "Coordinates" in h5_group.keys(): ## append the (N,3) coordinate array coordinates = np.append(coordinates,h5_group['Coordinates'][()],axis=0) elif ("x" in h5_group.keys() and "y" in h5_group.keys() and "z" in h5_group.keys()): ## read the coordinate data from x,y,z arrays xs = h5_group['x'][()] ys = h5_group['y'][()] zs = h5_group['z'][()] ## initialize a temporary coordinate array to append temp_coordinates = np.zeros((xs.size,3)) temp_coordinates[:,0] = xs temp_coordinates[:,1] = ys temp_coordinates[:,2] = zs ## append the coordinates coordinates = np.append(coordinates,temp_coordinates,axis=0) return coordinates def __getHDF5Fields(self,fname,particle_group,field_names,fieldss): """Extracts field data corresponding to particle positions from .hdf5 file from particle_group matching field_names (field names that are not present are ignored.) :param fname: path to hdf5 data file :type fname: str :param particle_group: name of hdf5 particle group naming :type particle_group: str :param field_names: strings to try and extract from .hdf5 file. :type field_names: list of strs :param fieldss: dictionary contianing mapping between field_name:field_data :type fieldss: dict :return: fieldss :rtype: dict if field_names is not None else None """ if field_names is None: return ## open the hdf5 file and read out field data with h5py.File(fname,'r') as handle: this_group = handle[particle_group] for field_name in field_names: ## allow users to pass a single array of fields that not all particle types ## will match if field_name in this_group.keys(): if field_name in fieldss: ## if it's already there append it fieldss[field_name] = np.append(fieldss[field_name],this_group[field_name]) ## initialize it otherwise else: fieldss[field_name] = this_group[field_name] return fieldss
def split_kwargs(kwargs): kwargs_keys = kwargs.keys() settings_keys = kwargs_keys & set(default_settings.keys()) particlegroup_keys = kwargs_keys - set(default_settings.keys()) return {key:kwargs[key] for key in settings_keys},{key:kwargs[key] for key in particlegroup_keys}