import os
import shutil
import pymongo
import subprocess
import multiprocessing
import warnings
import fnmatch
import astropy.io.fits
import astropy.wcs
from dbtools import DotReachable
[docs]class ImageLog(object):
"""Base class for all mongodb-based image logs.
The intent is for this base class to access the entire collection of image
records for a given project. Subclasses can be used that cater to specific
parts of the data set (such as one instrument or another; one observing
mode or another). This specificity is created with the `queryMask`, which is
a query that will always be appending to the user's query to ensure
that only the requested types of data are returned. For example, this can
be a query to only accept 'MegaPrime' under the `INSTRUME` key.
"""
def __init__(self, dbname, cname, url="localhost", port=27017):
super(ImageLog, self).__init__()
connection = pymongo.Connection(url, port)
self.db = connection[dbname]
self.db.add_son_manipulator(DotReachable())
self.c = self.db[cname]
self.dbname = dbname
self.cname = cname
self.url = url
self.port = port
self.queryMask = {}
self.exts = ["0"]
def __getitem__(self, key):
""":return: a document (`dict` type) for the image named `key`"""
selector = {"_id": key}
selector.update(self.queryMask)
return self.c.find_one(selector)
def _insert_query_mask(self, selector):
"""Enforces the query mask on the selector. The user can still override
the query mask.
"""
selector = dict(selector)
for k, v in self.queryMask.iteritems():
if k in selector: continue
selector[k] = v
return selector
[docs] def set(self, imageKey, key, value, ext=None):
"""Updates an image record by setting the `key` field to the given
`value`.
"""
if ext == None:
self.c.update({"_id": imageKey}, {"$set": {key: value}})
else:
self.c.update({"_id": imageKey},
{"$set": {".".join((str(ext), key)): value}})
[docs] def set_frames(self, key, data):
"""Does an update of data into the `key` field for data of an arbitrary
collection of detectors.
:param data: a dictionary of `frame: datum`, where `frame` is a tuple
of (imageKey, ext)
"""
for (imageKey, ext), datum in data:
self.set(imageKey, key, datum, ext=ext)
[docs] def find(self, selector, images=None, one=False, **mdbArgs):
"""Wrapper around MongoDB `find()`."""
selector = self._insert_query_mask(selector)
if images is not None:
selector.update({"_id": {"$in": images}})
if one:
return self.c.find_one(selector, **mdbArgs)
else:
return self.c.find(selector, **mdbArgs)
[docs] def find_dict(self, selector, images=None, fields=None):
"""Analogous to find(), but formats the returned cursor
into a dictionary."""
c = self.find(selector, images=images, fields=fields, one=False)
records = {}
for doc in c:
doc = dict(doc)
imageKey = doc['_id']
records[imageKey] = doc
return records
[docs] def find_images(self, selector, images=None):
"""Get images keys that match the specified selector using MongoDB
queries.
"""
selector = self._insert_query_mask(selector)
if images is not None:
selector.update({"_id": {"$in": images}})
records = self.c.find(selector, {"_id": 1})
imageKeys = [rec['_id'] for rec in records]
imageKeys.sort()
return imageKeys
[docs] def distinct(self, field, selector, images=None):
"""Return the set of distinct values a field takes over the
selection.
"""
cursor = self.find(selector, images=images, fields=[field])
return cursor.distinct(field)
[docs] def compress_fits(self, pathKey, selector={}, candidateImages=None,
alg="Rice", q=4, delete=False):
""":param alg: Compression algorithm. Any of:
* Rice
* gzip
:param q: quantization for floating-point images.
:param delete: set to true if the un-compressed original should be
deleted
"""
algs = {"Rice": "-r", "gzip": "-g"}
records = self.getiter(selector, pathKey,
candidateImages=candidateImages)
optList = []
if alg == "Rice":
optList.append("%s -q %i" % (algs[alg], q))
elif alg == "gzip":
optList.append(algs[alg])
if delete == True:
optList.append("-D")
options = " ".join(optList)
for rec in records:
origPath = rec[pathKey]
# Compress with fpack
subprocess.call("fpack %s %s" % (options, origPath), shell=True)
# Update filename with .fz extension
outputPath = origPath + ".fz"
self.set(rec['_id'], pathKey, outputPath)
[docs] def decompress_fits(self, pathKey, decompKey=None,
decompDir=None, selector={}, delete=False, overwrite=False,
nthreads=multiprocessing.cpu_count()):
"""Decompresses FITS files at `pathKey`.
:param pathKey: field where FITS paths are found
:param decompKey: (optional) can be set to a field where the
decompressed file can be found. Otherwise, the decompressed
file path is written to `pathKey`.
:param selector: (optional) search criteria dictionary
:param delete: set to True to delete the compressed file.
:param overwrite: if False, then funpack will skip any files
that have already been decompressed. That is, the output file must
be present and recorded in the image log under decompPathKey.
:param nthreads: set to the number of threads. Multiprocessing is
used if ``nthreads`` > 1.
"""
if decompDir is not None:
if os.path.exists(decompDir) is False:
os.makedirs(decompDir)
if decompKey is None:
decompKey = pathKey
records = self.getiter(selector, pathKey)
args = []
for rec in records:
origPath = rec[pathKey]
if decompDir is not None:
outputPath = os.path.join(decompDir,
os.path.basename(os.path.splitext(origPath)[0]))
else:
outputPath = os.path.splitext(origPath)[0]
# verify that this file exists, and possibly skip it
exists = False
if os.path.exists(outputPath):
exists = True
if overwrite: os.remove(outputPath)
if exists:
if decompKey in rec:
if rec[decompKey] == outputPath:
continue # this file is already decomp and recorded
options = ["-O %s" % outputPath]
if delete is True:
options.append("-D")
command = "funpack %s %s" % (" ".join(options), origPath)
print command
args.append((rec['_id'], command, outputPath))
if nthreads > 1:
pool = multiprocessing.Pool(processes=nthreads)
results = pool.map(_funpack_worker, args)
else:
results = map(_funpack_worker, args)
for result in results:
imageKey, outputPath = result
self.set(imageKey, decompKey, outputPath)
[docs] def rename_field(self, dataKeyOld, dataKeyNew, selector=None, multi=True):
"""Renames a field for all image log records found with the optional
selector.
:param dataKeyOld: original field name
:param dataKeyNew: new field name
:param selector: (optional) search selector dictionary
"""
if selector is None:
selector = {}
selector = self._insert_query_mask(selector)
selector.update({dataKeyOld: {"$exists": 1}})
ret = self.c.update(selector,
{"$rename": {dataKeyOld: dataKeyNew}},
multi=multi, safe=True)
print ret
[docs] def delete_field(self, dataKey, selector=None, multi=True):
"""Deletes a field from selected image records.
:param dataKey: field to be deleted.
:param selector: (optional) search selector dictionary
"""
if selector is None:
selector = {}
selector = self._insert_query_mask(selector)
print "using multi delete", multi
self.c.update(selector, {"$unset": {dataKey: 1}},
multi=multi)
[docs] def move_files(self, pathKey, newDir, selector=None, copy=False):
"""Moves a file whose path is found under `pathKey` to the `newDir`
directory. The directory is created if necessary. Old files are
overwritten if necessary.
:param dataKey: field of path for files to be moved
:param newDir: directory where files should be moved to.
:param selector: (optional) search selector dictionary
"""
if os.path.exists(newDir) is False:
os.makedirs(newDir)
if selector is None:
selector = {}
selector = self._insert_query_mask(selector)
for rec in self.getiter(selector, pathKey):
imageKey = rec['_id']
origPath = rec[pathKey]
newPath = os.path.join(newDir, os.path.basename(origPath))
if newPath == origPath:
continue
print origPath, "->", newPath
# if os.path.exists(newPath):
# os.remove(newPath)
if copy:
shutil.copy(origPath, newPath)
else:
shutil.move(origPath, newPath)
self.c.update({"_id": imageKey}, {"$set": {pathKey: newPath}})
[docs] def delete_files(self, pathKey, selector=None):
"""Deletes all files stored under pathKey, and the reference in the
image log.
:param pathKey: data key for path. Can include dot syntax.
:param selector: (optional) MongoDB query dictionay.
"""
if selector is None:
selector = {}
selector = self._insert_query_mask(selector)
selector.update({pathKey: {"$exists": 1}})
for doc in self.c.find(selector, fields=[pathKey]):
imageKey = doc['_id']
try:
path = str(doc[pathKey])
except:
continue
if os.path.exists(path):
os.remove(path)
self.c.update({"_id": imageKey}, {"$unset": pathKey})
[docs] def print_rec(self, imageKey):
"""Pretty-prints the record of `imageKey`"""
selector = {"_id": imageKey}
selector.update(self.queryMask)
record = self.c.find_one(selector)
keys = record.keys()
keys.sort()
print "== %s ==" % imageKey
for key in keys:
if str(key) in self.exts: continue
print "%s:" % key,
print record[key]
[docs] def search(self, selector, candidateImages=None):
"""Get images keys that match the specified selector using MongoDB queries.
:param selector: dictionary for data keys: data values that specifies
what image keys should be returned. Any mongodb search dictionary
will work.
:param candidateImages: (optional) a list of images to draw from; only images
within the candidateImages set will be considered
:return: the list of image keys for images that match the selector
dictionary.
.. deprecated::
Use :meth:`find_images` instead.
"""
warnings.warn(
'search() is deprecated, use find_images() instead',
stacklevel=2)
selector = self._insert_query_mask(selector)
if candidateImages is not None:
candidateImages = [candidateImages]
selector.update({"_id": {"$in": candidateImages}})
records = self.c.find(selector, {"_id": 1})
imageKeys = [rec['_id'] for rec in records]
imageKeys.sort()
return imageKeys
[docs] def getiter(self, selector, dataKeys, candidateImages=None):
"""Returns a cursor to iterate through image records that meet the
given selector. Each record is a dictionary, with the image key
stored under `_id`.
:param selector: dictionary for data keys: data values that specifies
what image keys should be returned. Any mongodb search dictionary
will work.
:param dataKeys: data key(s) for each image that should be returned for
the selected images. Can be string or a sequence of strings.
:param candidateImages: (optional) a **list** of images to draw from;
only images within the candidateImages set will be considered
.. deprecated::
Use :meth:`find` instead.
"""
warnings.warn(
'getiter() is deprecated, use find() instead',
stacklevel=2)
selector = self._insert_query_mask(selector)
print "getiter using selector", selector
if candidateImages is not None:
selector.update({"_id": {"$in": candidateImages}})
if type(dataKeys) == str:
dataKeys = [dataKeys]
return self.c.find(selector, fields=dataKeys)
[docs] def get(self, selector, dataKeys, candidateImages=None):
"""Get a dictionary of `image key: {data key: data value, ...}` for images
that match the search `selector`. The returned dictionary contains
data only with the requested dataKeys (i.e., a subset of the data base
and records for each image are returned.)
:param selector: dictionary for data keys: data values that specifies
what image keys should be returned. Any mongodb search dictionary
will work.
:param dataKeys: data for each image that should be returned for the
selected images.
:param candidateImages: (optional) a **list** of images to draw from;
only images within the candidateImages set will be considered
.. deprecated::
Use :meth:`find_dict` instead.
"""
warnings.warn(
'get() is deprecated, use find_dict() instead',
stacklevel=2)
records = {}
for record in self.getiter(selector, dataKeys, candidateImages=candidateImages):
record = dict(record)
imageKey = record['_id']
records[imageKey] = record
return records
[docs] def get_images(self, imageKeys, dataKeys):
"""Same as `get()`, but operates on a sequence of image keys, rather
than a search selector.
.. deprecated::
Use :meth:`find_dict` instead.
"""
warnings.warn(
'get_images() is deprecated, use find_dict() instead',
stacklevel=2)
return self.get({}, dataKeys, candidateImages=imageKeys)
[docs] def find_unique(self, dataKey, selector={}, candidateImages=None):
"""Get the set of unique values of data key for images that meet the
specified selector.
.. note:: This doesn't actually use the `distinct` aggregation commmand
in PyMongo, since it doesn't yet support queries itself.
:param dataKey: data field whose values will compiled into a set of
unique values.
:param selector: (optional) dictionary for data keys: data values that
specifies what image keys should be returned. Any mongodb search
dictionary will work.
:param candidateImages: (optional) a list of images to draw from; only
images within the candidateImages set will be considered
.. deprecated::
Use :meth:`distinct` instead.
"""
warnings.warn(
'find_unique() is deprecated, use distinct() instead',
stacklevel=2)
records = self.getiter(selector, dataKey, candidateImages=candidateImages)
itemList = [rec[dataKey] for rec in records]
filteredItemList = []
for item in itemList:
if type(item) is unicode:
filteredItemList.append(str(item))
else:
filteredItemList.append(item)
print "all items:", filteredItemList
valueSet = list(set(itemList))
valueSet.sort()
print "value set:", valueSet
return valueSet
def _funpack_worker(args):
"""Worker function for funpacking."""
imageKey, command, outputPath = args
subprocess.call(command, shell=True)
return imageKey, outputPath
[docs]class MEFImporter(object):
"""Base class for importing MEF (multi-extension FITS) into an imagelog.
The user subclass that inherits :class:`MEFImporter` should do several
things:
1. Set the ``exts`` attribute to be a list of FITS extension integers
to import. If the sole FITS image is in the PrimaryHDU, then leave
this as an empty list. Otherwise, e.g., for WIRCam with images in
four extension HDUs, set ``self.exts = [1, 2, 3, 4]``.
2. Set or extend the ``copy_keys`` attribute to be a list of FITS header
keys to import from the Primary HDU into the base image document.
3. Set or extend the ``copy_ext_keys`` attribute to be a list of FITS
header keys to import from each image extension.
4. Implement ``generate_id`` to generate a document ``_id`` string.
5. (optional) Implement ``post_base_ingest`` to modify the base image log
document (a dict) after the header keys have been imported. This can
be useful to add additional metadata, or to change the metadata schema
from that in the FITS header.
6. (optional) Implement ``post_ext_ingest`` to modify the document for
each image extension after the extension header keys are imported.
Parameters
----------
dbname : str
Name of MongoDB database.
cname : str
Name of MongoDB collection.
url : str
URL of MongoDB server.
port : int
Port of MongoDB server.
"""
def __init__(self, dbname, cname, url="localhost", port=27017):
super(MEFImporter, self).__init__()
self.connection = pymongo.MongoClient(url, port)
self.db = self.connection[dbname]
self.db.add_son_manipulator(DotReachable())
self.c = self.db[cname]
# Defaults
self.exts = []
self.copy_keys = ['OBJECT', 'FILTER', 'MJDATE',
'EXPTIME', 'INSTRUME', 'RA', 'DEC', 'AIRMASS', 'UTC-OBS']
self.copy_ext_keys = []
[docs] def ingest(self, base_dir, suffix=".fits", recursive=True, preview=False):
"""Runs the import pipeline.
Parameters
----------
base_dir : str
Directory where FITS files can be found.
suffix : str
Suffix of files to import. That is, all paths ending with this
suffix are imported. This import fits files by default. But
change to e.g., `'s.fits'` to get processed CFHT images, or
`'.fits.fz'` to look for encrypted FITS images.
recursive : bool
If `True`, then the pipeline walks through directories contained
in the base directory, looking for FITS files.
preview : bool
If `True` then the documents are *not* inserted into MongoDB,
but only printed. Useful for debugging the ingest.
"""
for path in MEFImporter.all_files(base_dir, "*" + suffix,
single_level=recursive):
self._import_fits(path)
[docs] def ingest_one(self, path, preview=False):
"""Ingest a single FITS file at ``path``.
Parameters
----------
path : str
Path to the FITS image.
preview : bool
If `True` then the documents are *not* inserted into MongoDB,
but only printed. Useful for debugging the ingest.
"""
self._import_fits(path, preview)
@staticmethod
[docs] def all_files(root, pattern, single_level=False):
"""Yield file paths matching a pattern.
Adapted from Python Cookbook, 2nd Ed. 2.16.
"""
for path, subdirs, files in os.walk(root):
files.sort()
for name in files:
if fnmatch.fnmatch(name, pattern):
yield os.path.join(path, name)
break
if single_level:
break
def _import_fits(self, path, preview):
"""Import a FITS file at ``path``."""
doc = {}
f = astropy.io.fits.open(path)
doc['_id'] = self.generate_id(path, f[0].header)
doc.update(self._ingest_fits_base(path, f[0].header, f))
for ext in self.exts:
doc[str(ext)] = self._ingest_fits_ext(f[ext].header, f)
# Put an overall footprint into doc root
if len(self.exts) == 1:
doc['footprint'] = MEFImporter.chip_footprint_polygon(
f[self.exts[0]].header, f)
elif len(self.exts) > 1:
doc['footprint'] = self._combine_footprint(doc)
f.close()
if preview:
print doc
else:
# Insert into MongoDB
self.c.save(doc)
[docs] def generate_id(self, path, header):
"""Generate the object id for this image.
Should be implemented by user. Raises ``NotImplementedError``
otherwise.
"""
raise NotImplementedError
def _ingest_fits_base(self, path, header, hdulist):
"""Build document from base FITS header."""
doc = {}
# Create a footprint if this is an image extension
if len(self.exts) == 0:
doc['footprint'] = MEFImporter.chip_footprint_polygon(header,
hdulist)
for key in self.copy_keys:
try:
doc[key] = header[key]
except:
continue
# Call user method
self.post_base_ingest(doc, path, header)
return doc
[docs] def post_base_ingest(self, doc, path, header):
"""Hook for modifying document after ingesting the base FITS header.
This method can be implemented by the user to add additional data
to the base image log document. Simply add data to to the ``dict``
``doc``.
"""
pass
def _ingest_fits_ext(self, header, hdulist):
"""Build document for an individual extension/chip."""
doc = {}
doc['footprint'] = MEFImporter.chip_footprint_polygon(header, hdulist)
for key in self.copy_ext_keys:
try:
doc[key] = header[key]
except:
continue
# Call user method
self.post_ext_ingest(doc, header)
return doc
[docs] def post_ext_ingest(self, doc, header):
"""Hook for modifying document after ingesting an extension header.
This method can be implemented by the user to add additional data
to the *extension-specific* image log document. Simply add data to
to the ``dict`` ``doc``.
"""
pass
def _combine_footprint(self, doc):
"""Build a footprint to encompass all detectors."""
ras = []
decs = []
for ext in self.exts:
poly = doc['ext']['footprint']
for (ra, dec) in poly:
ras.append(ra)
decs.append(dec)
output_poly = [[min(ras), min(decs)], [max(ras), min(decs)],
[max(ras), max(decs)], [min(ras), max(decs)]]
return output_poly
@staticmethod