Skip to content

Documentation for Store

Bases: object

Store class to access granules on-prem or in the cloud.

Store is the class to access data.

Parameters:

Name Type Description Default
auth Any

Auth instance to download and access data.

required
Source code in earthaccess/store.py
def __init__(self, auth: Any, pre_authorize: bool = False) -> None:
    """Store is the class to access data.

    Parameters:
        auth: Auth instance to download and access data.
    """
    if auth.authenticated is True:
        self.auth = auth
        self._s3_credentials: Dict[
            Tuple, Tuple[datetime.datetime, Dict[str, str]]
        ] = {}
        oauth_profile = f"https://{auth.system.edl_hostname}/profile"
        # sets the initial URS cookie
        self._requests_cookies: Dict[str, Any] = {}
        self.set_requests_session(oauth_profile)
        if pre_authorize:
            # collect cookies from other DAACs
            for url in DAAC_TEST_URLS:
                self.set_requests_session(url)

    else:
        logger.warning("The current session is not authenticated with NASA")
        self.auth = None
    self.in_region = self._running_in_us_west_2()

get(granules, local_path=None, provider=None, threads=8, *, pqdm_kwargs=None)

Retrieves data granules from a remote storage system.

  • If we run this in the cloud, we are moving data from S3 to a cloud compute instance (EC2, AWS Lambda).
  • If we run it outside the us-west-2 region and the data granules are part of a cloud-based collection, the method will not get any files.
  • If we request data granules from an on-prem collection, the data will be effectively downloaded to a local directory.

Parameters:

Name Type Description Default
granules Union[List[DataGranule], List[str]]

A list of granules(DataGranule) instances or a list of granule links (HTTP).

required
local_path Optional[Union[Path, str]]

Local directory to store the remote data granules. If not supplied, defaults to a subdirectory of the current working directory of the form data/YYYY-MM-DD-UUID, where YYYY-MM-DD is the year, month, and day of the current date, and UUID is the last 6 digits of a UUID4 value.

None
provider Optional[str]

a valid cloud provider, each DAAC has a provider code for their cloud distributions

None
threads int

Parallel number of threads to use to download the files; adjust as necessary, default = 8.

8
pqdm_kwargs Optional[Mapping[str, Any]]

Additional keyword arguments to pass to pqdm, a parallel processing library. See pqdm documentation for available options. Default is to use immediate exception behavior and the number of jobs specified by the threads parameter.

None

Returns:

Type Description
List[str]

List of downloaded files

Source code in earthaccess/store.py
def get(
    self,
    granules: Union[List[DataGranule], List[str]],
    local_path: Optional[Union[Path, str]] = None,
    provider: Optional[str] = None,
    threads: int = 8,
    *,
    pqdm_kwargs: Optional[Mapping[str, Any]] = None,
) -> List[str]:
    """Retrieves data granules from a remote storage system.

       * If we run this in the cloud,
         we are moving data from S3 to a cloud compute instance (EC2, AWS Lambda).
       * If we run it outside the us-west-2 region and the data granules are part of a cloud-based
         collection, the method will not get any files.
       * If we request data granules from an on-prem collection,
         the data will be effectively downloaded to a local directory.

    Parameters:
        granules: A list of granules(DataGranule) instances or a list of granule links (HTTP).
        local_path: Local directory to store the remote data granules.  If not
            supplied, defaults to a subdirectory of the current working directory
            of the form `data/YYYY-MM-DD-UUID`, where `YYYY-MM-DD` is the year,
            month, and day of the current date, and `UUID` is the last 6 digits
            of a UUID4 value.
        provider: a valid cloud provider, each DAAC has a provider code for their cloud distributions
        threads: Parallel number of threads to use to download the files;
            adjust as necessary, default = 8.
        pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library.
            See pqdm documentation for available options. Default is to use immediate exception behavior
            and the number of jobs specified by the `threads` parameter.

    Returns:
        List of downloaded files
    """
    if not granules:
        raise ValueError("List of URLs or DataGranule instances expected")

    if local_path is None:
        today = datetime.datetime.now().strftime("%Y-%m-%d")
        uuid = uuid4().hex[:6]
        local_path = Path.cwd() / "data" / f"{today}-{uuid}"

    pqdm_kwargs = {
        "n_jobs": threads,
        **(pqdm_kwargs or {}),
    }

    return self._get(granules, Path(local_path), provider, pqdm_kwargs=pqdm_kwargs)

get_fsspec_session() cached

Returns a fsspec HTTPS session with bearer tokens that are used by CMR.

This HTTPS session can be used to download granules if we want to use a direct, lower level API.

Returns:

Type Description
AbstractFileSystem

fsspec HTTPFileSystem (aiohttp client session)

Source code in earthaccess/store.py
@lru_cache
def get_fsspec_session(self) -> fsspec.AbstractFileSystem:
    """Returns a fsspec HTTPS session with bearer tokens that are used by CMR.

    This HTTPS session can be used to download granules if we want to use a direct,
    lower level API.

    Returns:
        fsspec HTTPFileSystem (aiohttp client session)
    """
    token = self.auth.token["access_token"]
    client_kwargs = {
        "headers": {"Authorization": f"Bearer {token}"},
        # This is important! If we trust the env and send a bearer token,
        # auth will fail!
        "trust_env": False,
    }
    session = fsspec.filesystem("https", client_kwargs=client_kwargs)
    return session

get_requests_session(bearer_token=True)

Returns a requests HTTPS session with bearer tokens that are used by CMR.

This HTTPS session can be used to download granules if we want to use a direct, lower level API.

Parameters:

Name Type Description Default
bearer_token bool

if true, will be used for authenticated queries on CMR

True

Returns:

Type Description
Session

requests Session

Source code in earthaccess/store.py
def get_requests_session(self, bearer_token: bool = True) -> requests.Session:
    """Returns a requests HTTPS session with bearer tokens that are used by CMR.

    This HTTPS session can be used to download granules if we want to use a direct,
    lower level API.

    Parameters:
        bearer_token: if true, will be used for authenticated queries on CMR

    Returns:
        requests Session
    """
    return self.auth.get_session()

get_s3_filesystem(daac=None, concept_id=None, provider=None, endpoint=None)

Return an s3fs.S3FileSystem instance for a given cloud provider / DAAC.

Parameters:

Name Type Description Default
daac Optional[str]

any of the DAACs, e.g. NSIDC, PODAAC

None
provider Optional[str]

a data provider if we know them, e.g. PODAAC -> POCLOUD

None
endpoint Optional[str]

pass the URL for the credentials directly

None

Returns:

Type Description
S3FileSystem

a s3fs file instance

Source code in earthaccess/store.py
def get_s3_filesystem(
    self,
    daac: Optional[str] = None,
    concept_id: Optional[str] = None,
    provider: Optional[str] = None,
    endpoint: Optional[str] = None,
) -> s3fs.S3FileSystem:
    """Return an `s3fs.S3FileSystem` instance for a given cloud provider / DAAC.

    Parameters:
        daac: any of the DAACs, e.g. NSIDC, PODAAC
        provider: a data provider if we know them, e.g. PODAAC -> POCLOUD
        endpoint: pass the URL for the credentials directly

    Returns:
        a s3fs file instance
    """
    if self.auth is None:
        raise ValueError(
            "A valid Earthdata login instance is required to retrieve S3 credentials"
        )
    if not any([concept_id, daac, provider, endpoint]):
        raise ValueError(
            "At least one of the concept_id, daac, provider or endpoint"
            "parameters must be specified. "
        )

    if concept_id is not None:
        provider = self._derive_concept_provider(concept_id)

    # Get existing S3 credentials if we already have them
    location = (
        daac,
        provider,
        endpoint,
    )  # Identifier for where to get S3 credentials from
    need_new_creds = False
    try:
        dt_init, creds = self._s3_credentials[location]
    except KeyError:
        need_new_creds = True
    else:
        # If cached credentials are expired, invalidate the cache
        delta = datetime.datetime.now() - dt_init
        if round(delta.seconds / 60, 2) > 55:
            need_new_creds = True
            self._s3_credentials.pop(location)

    if need_new_creds:
        # Don't have existing valid S3 credentials, so get new ones
        now = datetime.datetime.now()
        if endpoint is not None:
            creds = self.auth.get_s3_credentials(endpoint=endpoint)
        elif daac is not None:
            creds = self.auth.get_s3_credentials(daac=daac)
        elif provider is not None:
            creds = self.auth.get_s3_credentials(provider=provider)
        # Include new credentials in the cache
        self._s3_credentials[location] = now, creds

    return s3fs.S3FileSystem(
        key=creds["accessKeyId"],
        secret=creds["secretAccessKey"],
        token=creds["sessionToken"],
    )

get_s3fs_session(daac=None, concept_id=None, provider=None, endpoint=None)

Returns a s3fs instance for a given cloud provider / DAAC.

Parameters:

Name Type Description Default
daac Optional[str]

any of the DAACs, e.g. NSIDC, PODAAC

None
provider Optional[str]

a data provider if we know them, e.g. PODAAC -> POCLOUD

None
endpoint Optional[str]

pass the URL for the credentials directly

None

Returns:

Type Description
S3FileSystem

An s3fs.S3FileSystem authenticated for reading in-region in us-west-2 for 1 hour.

Source code in earthaccess/store.py
@deprecated("Use get_s3_filesystem instead")
def get_s3fs_session(
    self,
    daac: Optional[str] = None,
    concept_id: Optional[str] = None,
    provider: Optional[str] = None,
    endpoint: Optional[str] = None,
) -> s3fs.S3FileSystem:
    """Returns a s3fs instance for a given cloud provider / DAAC.

    Parameters:
       daac: any of the DAACs, e.g. NSIDC, PODAAC
       provider: a data provider if we know them, e.g. PODAAC -> POCLOUD
       endpoint: pass the URL for the credentials directly

    Returns:
       An `s3fs.S3FileSystem` authenticated for reading in-region in us-west-2 for 1 hour.
    """
    return self.get_s3_filesystem(daac, concept_id, provider, endpoint)

open(granules, provider=None, *, pqdm_kwargs=None)

Returns a list of file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray.

Parameters:

Name Type Description Default
granules Union[List[str], List[DataGranule]]

a list of granule instances or list of URLs, e.g. s3://some-granule. If a list of URLs is passed, we need to specify the data provider.

required
provider Optional[str]

e.g. POCLOUD, NSIDC_CPRD, etc.

None
pqdm_kwargs Optional[Mapping[str, Any]]

Additional keyword arguments to pass to pqdm, a parallel processing library. See pqdm documentation for available options. Default is to use immediate exception behavior and the number of jobs specified by the threads parameter.

None

Returns:

Type Description
List[AbstractBufferedFile]

A list of "file pointers" to remote (i.e. s3 or https) files.

Source code in earthaccess/store.py
def open(
    self,
    granules: Union[List[str], List[DataGranule]],
    provider: Optional[str] = None,
    *,
    pqdm_kwargs: Optional[Mapping[str, Any]] = None,
) -> List[fsspec.spec.AbstractBufferedFile]:
    """Returns a list of file-like objects that can be used to access files
    hosted on S3 or HTTPS by third party libraries like xarray.

    Parameters:
        granules: a list of granule instances **or** list of URLs, e.g. `s3://some-granule`.
            If a list of URLs is passed, we need to specify the data provider.
        provider: e.g. POCLOUD, NSIDC_CPRD, etc.
        pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library.
            See pqdm documentation for available options. Default is to use immediate exception behavior
            and the number of jobs specified by the `threads` parameter.

    Returns:
        A list of "file pointers" to remote (i.e. s3 or https) files.
    """
    if len(granules):
        return self._open(granules, provider, pqdm_kwargs=pqdm_kwargs)
    return []

set_requests_session(url, method='get', bearer_token=False)

Sets up a requests session with bearer tokens that are used by CMR.

Mainly used to get the authentication cookies from different DAACs and URS. This HTTPS session can be used to download granules if we want to use a direct, lower level API.

Parameters:

Name Type Description Default
url str

used to test the credentials and populate the class auth cookies

required
method str

HTTP method to test, default: "GET"

'get'
bearer_token bool

if true, will be used for authenticated queries on CMR

False

Returns:

Type Description
None

fsspec HTTPFileSystem (aiohttp client session)

Source code in earthaccess/store.py
def set_requests_session(
    self, url: str, method: str = "get", bearer_token: bool = False
) -> None:
    """Sets up a `requests` session with bearer tokens that are used by CMR.

    Mainly used to get the authentication cookies from different DAACs and URS.
    This HTTPS session can be used to download granules if we want to use a direct,
    lower level API.

    Parameters:
        url: used to test the credentials and populate the class auth cookies
        method: HTTP method to test, default: "GET"
        bearer_token: if true, will be used for authenticated queries on CMR

    Returns:
        fsspec HTTPFileSystem (aiohttp client session)
    """
    if not hasattr(self, "_http_session"):
        self._http_session = self.auth.get_session(bearer_token)

    resp = self._http_session.request(method, url, allow_redirects=True)

    if resp.status_code in [400, 401, 403]:
        new_session = requests.Session()
        resp_req = new_session.request(
            method, url, allow_redirects=True, cookies=self._requests_cookies
        )
        if resp_req.status_code in [400, 401, 403]:
            resp.raise_for_status()
        else:
            self._requests_cookies.update(new_session.cookies.get_dict())
    elif 200 <= resp.status_code < 300:
        self._requests_cookies = self._http_session.cookies.get_dict()
    else:
        resp.raise_for_status()