Source code for eemont.common

import ee
import json
import pkg_resources
import os
import warnings
import requests
from box import Box
from geopy.geocoders import get_geocoder_for_service

warnings.simplefilter("always", UserWarning)

# STAC
# --------------------------


def _get_platform_STAC(args):
    """Gets the platform (satellite) of an image (or image collection) and wheter if it is a Surface Reflectance product.

    Parameters
    ----------
    args : ee.Image | ee.ImageCollection
        Image or image collection to get the platform from.

    Returns
    -------
    dict
        Platform and product of the image (or image collection).
    """
    eemontDir = os.path.dirname(pkg_resources.resource_filename("eemont", "eemont.py"))
    dataPath = os.path.join(eemontDir, "data/ee-catalog-ids.json")

    f = open(dataPath)
    eeDict = json.load(f)
    platforms = list(eeDict.keys())

    ID = args.get("system:id").getInfo()

    plt = None

    for platform in platforms:

        if eeDict[platform]["gee:type"] == "image_collection" and isinstance(
            args, ee.image.Image
        ):
            pltID = "/".join(ID.split("/")[:-1])
        elif eeDict[platform]["gee:type"] == "image" and isinstance(
            args, ee.imagecollection.ImageCollection
        ):
            pass
        else:
            pltID = ID

        if platform == pltID:
            plt = pltID

        if "_SR" in pltID:
            platformDict = {"platform": plt, "sr": True}
        else:
            platformDict = {"platform": plt, "sr": False}

    if plt is None:
        raise Exception("Sorry, satellite platform not supported!")

    return platformDict


def _getSTAC(args):
    """Gets the STAC of the specified platform.

    Parameters
    ----------
    args : ee.Image | ee.ImageCollection
        Image or image collection to get the STAC from.

    Returns
    -------
    dict
        STAC of the ee.Image or ee.ImageCollection dataset.
    """
    platformDict = _get_platform_STAC(args)

    eemontDir = os.path.dirname(pkg_resources.resource_filename("eemont", "eemont.py"))
    dataPath = os.path.join(eemontDir, "data/ee-catalog-ids.json")

    f = open(dataPath)
    eeDict = json.load(f)

    STAC = requests.get(eeDict[platformDict["platform"]]["href"]).json()

    return STAC


# Spectral Indices
# --------------------------


def _get_expression_map(img, platformDict):
    """Gets the dictionary required for the map parameter in ee.Image.expression() method.

    Parameters
    ----------
    img : ee.Image
        Image to get the dictionary from.
    platformDict : dict
        Dictionary retrieved from the _get_platform() method.

    Returns
    -------
    dict
        Map dictionary for ee.Image.expression.
    """

    def lookupS2(img):
        return {
            "A": img.select("B1"),
            "B": img.select("B2"),
            "G": img.select("B3"),
            "R": img.select("B4"),
            "RE1": img.select("B5"),
            "RE2": img.select("B6"),
            "RE3": img.select("B7"),
            "N": img.select("B8"),
            "RE4": img.select("B8A"),
            "WV": img.select("B9"),
            "S1": img.select("B11"),
            "S2": img.select("B12"),
        }

    def lookupL8(img):
        return {
            "A": img.select("B1"),
            "B": img.select("B2"),
            "G": img.select("B3"),
            "R": img.select("B4"),
            "N": img.select("B5"),
            "S1": img.select("B6"),
            "S2": img.select("B7"),
            "T1": img.select("B10"),
            "T2": img.select("B11"),
        }

    def lookupL457(img):
        return {
            "B": img.select("B1"),
            "G": img.select("B2"),
            "R": img.select("B3"),
            "N": img.select("B4"),
            "S1": img.select("B5"),
            "T1": img.select("B6"),
            "S2": img.select("B7"),
        }

    def lookupMOD09GQ(img):
        return {"R": img.select("sur_refl_b01"), "N": img.select("sur_refl_b02")}

    def lookupMOD09GA(img):
        return {
            "B": img.select("sur_refl_b03"),
            "G": img.select("sur_refl_b04"),
            "R": img.select("sur_refl_b01"),
            "N": img.select("sur_refl_b02"),
            "S1": img.select("sur_refl_b06"),
            "S2": img.select("sur_refl_b07"),
        }

    def lookupMCD43A4(img):
        return {
            "B": img.select("Nadir_Reflectance_Band3"),
            "G": img.select("Nadir_Reflectance_Band4"),
            "R": img.select("Nadir_Reflectance_Band1"),
            "N": img.select("Nadir_Reflectance_Band2"),
            "S1": img.select("Nadir_Reflectance_Band6"),
            "S2": img.select("Nadir_Reflectance_Band7"),
        }

    lookupPlatform = {
        "COPERNICUS/S2": lookupS2,
        "COPERNICUS/S2_SR": lookupS2,
        "LANDSAT/LC08/C01/T1_SR": lookupL8,
        "LANDSAT/LC08/C01/T2_SR": lookupL8,
        "LANDSAT/LE07/C01/T1_SR": lookupL457,
        "LANDSAT/LE07/C01/T2_SR": lookupL457,
        "LANDSAT/LT05/C01/T1_SR": lookupL457,
        "LANDSAT/LT05/C01/T2_SR": lookupL457,
        "LANDSAT/LT04/C01/T1_SR": lookupL457,
        "LANDSAT/LT04/C01/T2_SR": lookupL457,
        "MODIS/006/MOD09GQ": lookupMOD09GQ,
        "MODIS/006/MYD09GQ": lookupMOD09GQ,
        "MODIS/006/MOD09GA": lookupMOD09GA,
        "MODIS/006/MYD09GA": lookupMOD09GA,
        "MODIS/006/MOD09Q1": lookupMOD09GQ,
        "MODIS/006/MYD09Q1": lookupMOD09GQ,
        "MODIS/006/MOD09A1": lookupMOD09GA,
        "MODIS/006/MYD09A1": lookupMOD09GA,
        "MODIS/006/MCD43A4": lookupMCD43A4,
    }

    if platformDict["platform"] not in list(lookupPlatform.keys()):
        raise Exception(
            "Sorry, satellite platform not supported for index computation!"
        )

    return lookupPlatform[platformDict["platform"]](img)


def _get_indices(online):
    """Retrieves the dictionary of indices used for the index() method in ee.Image and ee.ImageCollection classes.

    Parameters
    ----------
    online : boolean
        Wheter to retrieve the most recent list of indices directly from the GitHub repository and not from the local copy.

    Returns
    -------
    dict
        Indices.
    """
    if online:
        indices = requests.get(
            "https://raw.githubusercontent.com/davemlz/awesome-ee-spectral-indices/main/output/spectral-indices-dict.json"
        ).json()
    else:
        eemontDir = os.path.dirname(
            pkg_resources.resource_filename("eemont", "eemont.py")
        )
        dataPath = os.path.join(eemontDir, "data/spectral-indices-dict.json")
        f = open(dataPath)
        indices = json.load(f)

    return indices["SpectralIndices"]


def _get_kernel_image(img, lookup, kernel, sigma, a, b):
    """Creates an ee.Image representing a kernel computed on bands [a] and [b].

    Parameters
    ----------
    img : ee.Image
        Image to compute the kernel on.
    lookup : dict
        Dictionary retrieved from _get_expression_map().
    kernel : str
        Kernel to use.
    sigma : str | float
        Length-scale parameter. Used for kernel = 'RBF'.
    a : str
        Key of the first band to use.
    b : str
        Key of the second band to use.

    Returns
    -------
    ee.Image
        Kernel image.
    """
    if a not in list(lookup.keys()) or b not in list(lookup.keys()):
        return None
    else:
        lookupab = {"a": lookup[a], "b": lookup[b]}
        if isinstance(sigma, str):
            lookup = {**lookup, **lookupab, "sigma": img.expression(sigma, lookupab)}
        else:
            lookup = {**lookup, **lookupab, "sigma": sigma}
        kernels = {
            "linear": "a * b",
            "RBF": "exp((-1.0 * (a - b) ** 2.0)/(2.0 * sigma ** 2.0))",
            "poly": "((a * b) + c) ** p",
        }
        return img.expression(kernels[kernel], lookup)


def _remove_none_dict(dictionary):
    """Removes elements from a dictionary with None values.

    Parameters
    ----------
    dictionary : dict

    Returns
    -------
    dict
        Curated dictionary.
    """
    newDictionary = dict(dictionary)
    for key in dictionary.keys():
        if dictionary[key] is None:
            del newDictionary[key]
    return newDictionary


def _get_kernel_parameters(img, lookup, kernel, sigma):
    """Gets the additional kernel parameters to compute kernel indices.

    Parameters
    ----------
    img : ee.Image
        Image to compute the kernel parameters on.
    lookup : dict
        Dictionary retrieved from _get_expression_map().
    kernel : str
        Kernel to use.
    sigma : str | float
        Length-scale parameter. Used for kernel = 'RBF'.

    Returns
    -------
    dict
        Kernel parameters.
    """
    kernelParameters = {
        "kNN": _get_kernel_image(img, lookup, kernel, sigma, "N", "N"),
        "kNR": _get_kernel_image(img, lookup, kernel, sigma, "N", "R"),
        "kNB": _get_kernel_image(img, lookup, kernel, sigma, "N", "B"),
        "kNL": _get_kernel_image(img, lookup, kernel, sigma, "N", "L"),
        "kGG": _get_kernel_image(img, lookup, kernel, sigma, "G", "G"),
        "kGR": _get_kernel_image(img, lookup, kernel, sigma, "G", "R"),
        "kGB": _get_kernel_image(img, lookup, kernel, sigma, "G", "B"),
        "kBB": _get_kernel_image(img, lookup, kernel, sigma, "B", "B"),
        "kBR": _get_kernel_image(img, lookup, kernel, sigma, "B", "R"),
        "kBL": _get_kernel_image(img, lookup, kernel, sigma, "B", "L"),
        "kRR": _get_kernel_image(img, lookup, kernel, sigma, "R", "R"),
        "kRB": _get_kernel_image(img, lookup, kernel, sigma, "R", "B"),
        "kRL": _get_kernel_image(img, lookup, kernel, sigma, "R", "L"),
        "kLL": _get_kernel_image(img, lookup, kernel, sigma, "L", "L"),
    }

    return kernelParameters


def _index(
    self,
    index,
    G,
    C1,
    C2,
    L,
    cexp,
    nexp,
    alpha,
    slope,
    intercept,
    kernel,
    sigma,
    p,
    c,
    online,
):
    """Computes one or more spectral indices (indices are added as bands) for an image oir image collection.

    Parameters
    ----------
    self : ee.Image | ee.ImageCollection
        Image to compute indices on. Must be scaled to [0,1]. Check the supported platforms in User Guide > Spectral Indices > Supported Platforms.
    index : string | list[string]
        Index or list of indices to compute.
    G : float
        Gain factor. Used just for index = 'EVI'.
    C1 : float
        Coefficient 1 for the aerosol resistance term. Used just for index = 'EVI'.
    C2 : float
        Coefficient 2 for the aerosol resistance term. Used just for index = 'EVI'.
    L : float
        Canopy background adjustment. Used just for index = ['EVI','SAVI'].
    cexp : float
        Exponent used for OCVI.
    nexp : float
        Exponent used for GDVI.
    alpha : float
        Weighting coefficient  used for WDRVI.
    slope : float
        Soil line slope.
    intercept : float
        Soil line intercept.
    kernel : str
        Kernel used for kernel indices.
    sigma : str | float
        Length-scale parameter. Used for kernel = 'RBF'. If str, this must be an expression including 'a' and 'b'. If numeric, this must be positive.
    p : float
        Kernel degree. Used for kernel = 'poly'.
    c : float
        Free parameter that trades off the influence of higher-order versus lower-order terms. Used for kernel = 'poly'. This must be greater than or equal to 0.
    online : boolean
        Wheter to retrieve the most recent list of indices directly from the GitHub repository and not from the local copy.

    Returns
    -------
    ee.Image | ee.ImageCollection
        Image (or Image Collection) with the computed spectral index, or indices, as new bands.
    """
    platformDict = _get_platform_STAC(self)

    if isinstance(sigma, int) or isinstance(sigma, float):
        if sigma < 0:
            raise Exception("[sigma] must be positive!")

    if p <= 0 or c < 0:
        raise Exception("[p] and [c] must be positive!")

    additionalParameters = {
        "g": float(G),
        "C1": float(C1),
        "C2": float(C2),
        "L": float(L),
        "cexp": float(cexp),
        "nexp": float(nexp),
        "alpha": float(alpha),
        "sla": float(slope),
        "slb": float(intercept),
        "p": float(p),
        "c": float(c),
    }

    spectralIndices = _get_indices(online)
    indicesNames = list(spectralIndices.keys())

    if not isinstance(index, list):
        if index == "all":
            index = list(spectralIndices.keys())
        elif index in [
            "vegetation",
            "burn",
            "water",
            "snow",
            "drought",
            "urban",
            "kernel",
        ]:
            temporalListOfIndices = []
            for idx in indicesNames:
                if spectralIndices[idx]["type"] == index:
                    temporalListOfIndices.append(idx)
            index = temporalListOfIndices
        else:
            index = [index]

    for idx in index:
        if idx not in list(spectralIndices.keys()):
            warnings.warn(
                "Index " + idx + " is not a built-in index and it won't be computed!"
            )
        else:

            def temporalIndex(img):
                lookupDic = _get_expression_map(img, platformDict)
                lookupDic = {**lookupDic, **additionalParameters}
                kernelParameters = _get_kernel_parameters(img, lookupDic, kernel, sigma)
                lookupDic = {**lookupDic, **kernelParameters}
                lookupDicCurated = _remove_none_dict(lookupDic)
                if all(
                    band in list(lookupDicCurated.keys())
                    for band in spectralIndices[idx]["bands"]
                ):
                    return img.addBands(
                        img.expression(
                            spectralIndices[idx]["formula"], lookupDicCurated
                        ).rename(idx)
                    )
                else:
                    warnings.warn(
                        "This platform doesn't have the required bands for "
                        + idx
                        + " computation!"
                    )
                    return img

            if isinstance(self, ee.imagecollection.ImageCollection):
                self = self.map(temporalIndex)
            elif isinstance(self, ee.image.Image):
                self = temporalIndex(self)

    return self


[docs]def indices(online=False): """Gets the dictionary of available indices as a Box object. Parameters ---------- online : boolean Wheter to retrieve the most recent list of indices directly from the GitHub repository and not from the local copy. Returns ------- Box Dictionary of available indices. For each index, the keys 'short_name', 'long_name', 'formula', 'bands', 'reference', 'type', 'date_of_addition' and 'contributor' can be checked. See Also -------- listIndices : Gets the list of available indices. Examples -------- >>> import eemont >>> indices = eemont.indices() >>> indices.BAIS2.long_name 'Burned Area Index for Sentinel 2' >>> indices.BAIS2.formula '(1.0 - ((RE2 * RE3 * RE4) / R) ** 0.5) * (((S2 - RE4)/(S2 + RE4) ** 0.5) + 1.0)' >>> indices.BAIS2.reference 'https://doi.org/10.3390/ecrs-2-05177' """ return Box(_get_indices(online), frozen_box=True)
[docs]def listIndices(online=False): """Gets the list of available indices. Parameters ---------- online : boolean Wheter to retrieve the most recent list of indices directly from the GitHub repository and not from the local copy. Returns ------- list List of available indices. See Also -------- indices : Gets the dictionary of available indices as a Box object. Examples -------- >>> import eemont >>> eemont.listIndices() ['BNDVI','CIG','CVI','EVI','EVI2','GBNDVI','GNDVI',...] """ return list(_get_indices(online).keys())
# Image Scaling # -------------------------- def _get_scale_params(args): """Gets the scale parameters for each band of the image or image collection. Parameters ---------- args : ee.Image | ee.ImageCollection Image or image collection to get the scale parameters from. Returns ------- dict Dictionary with the scale parameters for each band. """ platformDict = _get_platform_STAC(args) eemontDir = os.path.dirname(pkg_resources.resource_filename("eemont", "eemont.py")) dataPath = os.path.join(eemontDir, "data/ee-catalog-scale.json") f = open(dataPath) eeDict = json.load(f) platforms = list(eeDict.keys()) if platformDict["platform"] not in platforms: warnings.warn("This platform is not supported for getting scale parameters.") return None else: return eeDict[platformDict["platform"]] def _get_offset_params(args): """Gets the offset parameters for each band of the image or image collection. Parameters ---------- args : ee.Image | ee.ImageCollection Image or image collection to get the offset parameters from. Returns ------- dict Dictionary with the offset parameters for each band. """ platformDict = _get_platform_STAC(args) eemontDir = os.path.dirname(pkg_resources.resource_filename("eemont", "eemont.py")) dataPath = os.path.join(eemontDir, "data/ee-catalog-offset.json") f = open(dataPath) eeDict = json.load(f) platforms = list(eeDict.keys()) if platformDict["platform"] not in platforms: warnings.warn("This platform is not supported for getting offset parameters.") return None else: return eeDict[platformDict["platform"]] def _scale_STAC(self): """Scales bands on an image or image collection. Parameters ---------- self : ee.Image | ee.ImageCollection Image or iage collection to scale. Returns ------- ee.Image | ee.ImageCollection Scaled image or image collection. """ scaleParams = _get_scale_params(self) offsetParams = _get_offset_params(self) if scaleParams is None or offsetParams is None: warnings.warn("This platform is not supported for scaling and offsetting.") return self else: scaleParams = ee.Dictionary(scaleParams).toImage() offsetParams = ee.Dictionary(offsetParams).toImage() def scaleOffset(img): bands = img.bandNames() scaleList = scaleParams.bandNames() bands = bands.filter(ee.Filter.inList("item", scaleList)) SOscaleParams = scaleParams.select(bands) SOoffsetParams = offsetParams.select(bands) scaled = img.select(bands).multiply(SOscaleParams).add(SOoffsetParams) return ee.Image(scaled.copyProperties(img, img.propertyNames())) if isinstance(self, ee.image.Image): scaled = scaleOffset(self) elif isinstance(self, ee.imagecollection.ImageCollection): scaled = self.map(scaleOffset) return scaled # Cloud Masking # -------------------------- def _maskClouds( self, method, prob, maskCirrus, maskShadows, scaledImage, dark, cloudDist, buffer, cdi, ): """Masks clouds and shadows in an image or image collection (valid just for Surface Reflectance products). Parameters ---------- self : ee.Image | ee.ImageCollection Image or image collection to mask. method : string, default = 'cloud_prob' Method used to mask clouds.\n Available options: - 'cloud_prob' : Use cloud probability. - 'qa' : Use Quality Assessment band. This parameter is ignored for Landsat products. prob : numeric [0, 100], default = 60 Cloud probability threshold. Valid just for method = 'prob'. This parameter is ignored for Landsat products. maskCirrus : boolean, default = True Whether to mask cirrus clouds. Valid just for method = 'qa'. This parameter is ignored for Landsat products. maskShadows : boolean, default = True Whether to mask cloud shadows. For more info see 'Braaten, J. 2020. Sentinel-2 Cloud Masking with s2cloudless. Google Earth Engine, Community Tutorials'. scaledImage : boolean, default = False Whether the pixel values are scaled to the range [0,1] (reflectance values). This parameter is ignored for Landsat products. dark : float [0,1], default = 0.15 NIR threshold. NIR values below this threshold are potential cloud shadows. This parameter is ignored for Landsat products. cloudDist : int, default = 1000 Maximum distance in meters (m) to look for cloud shadows from cloud edges. This parameter is ignored for Landsat products. buffer : int, default = 250 Distance in meters (m) to dilate cloud and cloud shadows objects. This parameter is ignored for Landsat products. cdi : float [-1,1], default = None Cloud Displacement Index threshold. Values below this threshold are considered potential clouds. A cdi = None means that the index is not used. For more info see 'Frantz, D., HaS, E., Uhl, A., Stoffels, J., Hill, J. 2018. Improvement of the Fmask algorithm for Sentinel-2 images: Separating clouds from bright surfaces based on parallax effects. Remote Sensing of Environment 2015: 471-481'. This parameter is ignored for Landsat products. Returns ------- ee.Image | ee.ImageCollection Cloud-shadow masked image or image collection. """ def S3(args): qa = args.select("quality_flags") notCloud = qa.bitwiseAnd(1 << 27).eq(0) return args.updateMask(notCloud) def S2(args): def cloud_prob(img): clouds = ee.Image(img.get("cloud_mask")).select("probability") isCloud = clouds.gte(prob).rename("CLOUD_MASK") return img.addBands(isCloud) def QA(img): qa = img.select("QA60") cloudBitMask = 1 << 10 isCloud = qa.bitwiseAnd(cloudBitMask).eq(0) if maskCirrus: cirrusBitMask = 1 << 11 isCloud = isCloud.And(qa.bitwiseAnd(cirrusBitMask).eq(0)) isCloud = isCloud.Not().rename("CLOUD_MASK") return img.addBands(isCloud) def CDI(img): idx = img.get("system:index") S2TOA = ( ee.ImageCollection("COPERNICUS/S2") .filter(ee.Filter.eq("system:index", idx)) .first() ) CloudDisplacementIndex = ee.Algorithms.Sentinel2.CDI(S2TOA) isCloud = CloudDisplacementIndex.lt(cdi).rename("CLOUD_MASK_CDI") return img.addBands(isCloud) def get_shadows(img): notWater = img.select("SCL").neq(6) if not scaledImage: darkPixels = img.select("B8").lt(dark * 1e4).multiply(notWater) else: darkPixels = img.select("B8").lt(dark).multiply(notWater) shadowAzimuth = ee.Number(90).subtract( ee.Number(img.get("MEAN_SOLAR_AZIMUTH_ANGLE")) ) cloudProjection = img.select("CLOUD_MASK").directionalDistanceTransform( shadowAzimuth, cloudDist / 10 ) cloudProjection = ( cloudProjection.reproject(crs=img.select(0).projection(), scale=10) .select("distance") .mask() ) isShadow = cloudProjection.multiply(darkPixels).rename("SHADOW_MASK") return img.addBands(isShadow) def clean_dilate(img): isCloudShadow = img.select("CLOUD_MASK") if cdi != None: isCloudShadow = isCloudShadow.And(img.select("CLOUD_MASK_CDI")) if maskShadows: isCloudShadow = isCloudShadow.add(img.select("SHADOW_MASK")).gt(0) isCloudShadow = ( isCloudShadow.focal_min(20, units="meters") .focal_max(buffer * 2 / 10, units="meters") .rename("CLOUD_SHADOW_MASK") ) return img.addBands(isCloudShadow) def apply_mask(img): return img.updateMask(img.select("CLOUD_SHADOW_MASK").Not()) if isinstance(self, ee.image.Image): if method == "cloud_prob": S2Clouds = ee.ImageCollection("COPERNICUS/S2_CLOUD_PROBABILITY") fil = ee.Filter.equals( leftField="system:index", rightField="system:index" ) S2WithCloudMask = ee.Join.saveFirst("cloud_mask").apply( ee.ImageCollection(args), S2Clouds, fil ) S2Masked = ee.ImageCollection(S2WithCloudMask).map(cloud_prob).first() elif method == "qa": S2Masked = QA(args) if cdi != None: S2Masked = CDI(S2Masked) if maskShadows: S2Masked = get_shadows(S2Masked) S2Masked = apply_mask(clean_dilate(S2Masked)) elif isinstance(self, ee.imagecollection.ImageCollection): if method == "cloud_prob": S2Clouds = ee.ImageCollection("COPERNICUS/S2_CLOUD_PROBABILITY") fil = ee.Filter.equals( leftField="system:index", rightField="system:index" ) S2WithCloudMask = ee.Join.saveFirst("cloud_mask").apply( args, S2Clouds, fil ) S2Masked = ee.ImageCollection(S2WithCloudMask).map(cloud_prob) elif method == "qa": S2Masked = args.map(QA) if cdi != None: S2Masked = S2Masked.map(CDI) if maskShadows: S2Masked = S2Masked.map(get_shadows) S2Masked = S2Masked.map(clean_dilate).map(apply_mask) return S2Masked def L8(args): cloudsBitMask = 1 << 5 qa = args.select("pixel_qa") mask = qa.bitwiseAnd(cloudsBitMask).eq(0) if maskShadows: cloudShadowBitMask = 1 << 3 mask = mask.And(qa.bitwiseAnd(cloudShadowBitMask).eq(0)) return args.updateMask(mask) def L457(args): qa = args.select("pixel_qa") cloud = qa.bitwiseAnd(1 << 5).And(qa.bitwiseAnd(1 << 7)) if maskShadows: cloud = cloud.Or(qa.bitwiseAnd(1 << 3)) mask2 = args.mask().reduce(ee.Reducer.min()) return args.updateMask(cloud.Not()).updateMask(mask2) def MOD09GA(args): qa = args.select("state_1km") notCloud = qa.bitwiseAnd(1 << 0).eq(0) if maskShadows: notCloud = notCloud.And(qa.bitwiseAnd(1 << 2).eq(0)) if maskCirrus: notCloud = notCloud.And(qa.bitwiseAnd(1 << 8).eq(0)) return args.updateMask(notCloud) def MCD15A3H(args): qa = args.select("FparExtra_QC") notCloud = qa.bitwiseAnd(1 << 5).eq(0) if maskShadows: notCloud = notCloud.And(qa.bitwiseAnd(1 << 6).eq(0)) if maskCirrus: notCloud = notCloud.And(qa.bitwiseAnd(1 << 4).eq(0)) return args.updateMask(notCloud) def MOD09Q1(args): qa = args.select("State") notCloud = qa.bitwiseAnd(1 << 0).eq(0) if maskShadows: notCloud = notCloud.And(qa.bitwiseAnd(1 << 2).eq(0)) if maskCirrus: notCloud = notCloud.And(qa.bitwiseAnd(1 << 8).eq(0)) return args.updateMask(notCloud) def MOD09A1(args): qa = args.select("StateQA") notCloud = qa.bitwiseAnd(1 << 0).eq(0) if maskShadows: notCloud = notCloud.And(qa.bitwiseAnd(1 << 2).eq(0)) if maskCirrus: notCloud = notCloud.And(qa.bitwiseAnd(1 << 8).eq(0)) return args.updateMask(notCloud) def MOD17A2H(args): qa = args.select("Psn_QC") notCloud = qa.bitwiseAnd(1 << 3).eq(0) return args.updateMask(notCloud) def MOD16A2(args): qa = args.select("ET_QC") notCloud = qa.bitwiseAnd(1 << 3).eq(0) return args.updateMask(notCloud) def MOD13Q1A1(args): qa = args.select("SummaryQA") notCloud = qa.bitwiseAnd(1 << 0).eq(0) return args.updateMask(notCloud) def MOD13A2(args): qa = args.select("SummaryQA") notCloud = qa.eq(0) return args.updateMask(notCloud) def VNP09GA(args): qf1 = args.select("QF1") qf2 = args.select("QF2") notCloud = qf1.bitwiseAnd(1 << 2).eq(0) if maskShadows: notCloud = notCloud.And(qf2.bitwiseAnd(1 << 3).eq(0)) if maskCirrus: notCloud = notCloud.And(qf2.bitwiseAnd(1 << 6).eq(0)) notCloud = notCloud.And(qf2.bitwiseAnd(1 << 7).eq(0)) return args.updateMask(notCloud) def VNP13A1(args): qa = args.select("pixel_reliability") notCloud = qa.neq(9) if maskShadows: notCloud = notCloud.And(qa.neq(7)) return args.updateMask(notCloud) lookup = { "COPERNICUS/S3/OLCI": S3, "COPERNICUS/S2_SR": S2, "LANDSAT/LC08/C01/T1_SR": L8, "LANDSAT/LC08/C01/T2_SR": L8, "LANDSAT/LE07/C01/T1_SR": L457, "LANDSAT/LE07/C01/T2_SR": L457, "LANDSAT/LT05/C01/T1_SR": L457, "LANDSAT/LT05/C01/T2_SR": L457, "LANDSAT/LT04/C01/T1_SR": L457, "LANDSAT/LT04/C01/T2_SR": L457, "MODIS/006/MOD09GA": MOD09GA, "MODIS/006/MCD15A3H": MCD15A3H, "MODIS/006/MOD09Q1": MOD09Q1, "MODIS/006/MOD09A1": MOD09A1, "MODIS/006/MOD17A2H": MOD17A2H, "MODIS/006/MOD16A2": MOD16A2, "MODIS/006/MOD13Q1": MOD13Q1A1, "MODIS/006/MOD13A1": MOD13Q1A1, "MODIS/006/MOD13A2": MOD13A2, "MODIS/006/MYD09GA": MOD09GA, "MODIS/006/MYD09Q1": MOD09Q1, "MODIS/006/MYD09A1": MOD09A1, "MODIS/006/MYD17A2H": MOD17A2H, "MODIS/006/MYD16A2": MOD16A2, "MODIS/006/MYD13Q1": MOD13Q1A1, "MODIS/006/MYD13A1": MOD13Q1A1, "MODIS/006/MYD13A2": MOD13A2, "NOAA/VIIRS/001/VNP09GA": VNP09GA, "NOAA/VIIRS/001/VNP13A1": VNP13A1, } platformDict = _get_platform_STAC(self) if platformDict["platform"] not in list(lookup.keys()): warnings.warn("This platform is not supported for cloud masking.") return self else: if isinstance(self, ee.image.Image): masked = lookup[platformDict["platform"]](self) elif isinstance(self, ee.imagecollection.ImageCollection): if platformDict["platform"] == "COPERNICUS/S2_SR": masked = lookup[platformDict["platform"]](self) else: masked = self.map(lookup[platformDict["platform"]]) return masked # Preprocessing # -------------------------- def _preprocess(self, **kwargs): """Pre-process the image, or image collection: masks clouds and shadows, and scales and offsets the image, or image collection. Parameters ---------- self : ee.Image | ee.ImageCollection Image or Image Collection to pre-process. **kwargs : Keywords arguments for maskClouds(). Returns ------- ee.Image | ee.ImageCollection Pre-processed image or image collection. """ maskCloudsDefault = { "method": "cloud_prob", "prob": 60, "maskCirrus": True, "maskShadows": True, "scaledImage": False, "dark": 0.15, "cloudDist": 1000, "buffer": 250, "cdi": None, } for key, value in maskCloudsDefault.items(): if key not in kwargs.keys(): kwargs[key] = value self = _maskClouds(self, **kwargs) self = _scale_STAC(self) return self # Citation Tools # -------------------------- def _getDOI(args): """Gets the DOI of the specified platform, if available. Parameters ---------- args : ee.Image | ee.ImageCollection Image or image collection to get the DOI from. Returns ------- str DOI of the ee.Image or ee.ImageCollection dataset. """ platformDict = _get_platform_STAC(args) eemontDir = os.path.dirname(pkg_resources.resource_filename("eemont", "eemont.py")) dataPath = os.path.join(eemontDir, "data/ee-catalog-ids.json") f = open(dataPath) eeDict = json.load(f) return eeDict[platformDict["platform"]]["sci:doi"] def _getCitation(args): """Gets the citation of the specified platform, if available. Parameters ---------- args : ee.Image | ee.ImageCollection Image or image collection to get the citation from. Returns ------- str Citation of the ee.Image or ee.ImageCollection dataset. """ platformDict = _get_platform_STAC(args) eemontDir = os.path.dirname(pkg_resources.resource_filename("eemont", "eemont.py")) dataPath = os.path.join(eemontDir, "data/ee-catalog-ids.json") f = open(dataPath) eeDict = json.load(f) return eeDict[platformDict["platform"]]["sci:citation"] # Geocoding # -------------------------- def _retrieve_location(query, geocoder, exactly_one, **kwargs): """Retrieves a location from a query. Parameters ---------- query : str Address, query or structured query to geocode. geocoder : str Geocoder to use. Please visit https://geopy.readthedocs.io/ for more info. exactly_one : boolean Whether to retrieve just one location. **kwargs : Keywords arguments for geolocator.geocode(). The user_agent argument is mandatory (this argument can be set as user_agent = 'my-gee-username' or user_agent = 'my-gee-app-name'). Please visit https://geopy.readthedocs.io/ for more info. Returns ------- Location Retrieved location. """ cls = get_geocoder_for_service(geocoder) geolocator = cls(**kwargs) location = geolocator.geocode(query, exactly_one=exactly_one) if location is None: raise Exception("No matches were found for your query!") else: return location def _lnglat_from_location(location): """Returns the longitude and latitude from a location. Parameters ---------- location : Location Retrieved location. Must be only one location. Returns ------- tuple The longitude and latitude geocoded from the query. """ return [location.longitude, location.latitude]