Skip to content

Python API

The Python API provides programmatic access to gedixr's functionality for custom workflows.

Core Functions

download_data

Download GEDI data using NASA Harmony API based on a time range and spatial subset. Please note that if subset_vector is provided, the download will be subset to the bounding box of the vector geometry and not the exact geometry itself. To perform precise spatial subsetting, use the vector file again during data extraction.

Parameters:

Name Type Description Default
directory str or Path

Directory where downloaded files will be saved. A subdirectory named after the GEDI product will be created within this directory and files will be saved there.

required
gedi_product str

GEDI product name: 'L2A' or 'L2B'

required
time_range tuple of str

Time range as (start_date, end_date) in format 'YYYY-MM-DD'

None
subset_vector str or Path

Path to vector file for spatial subsetting. Please note that the download will be subset to the bounding box of the vector geometry and not the exact geometry itself. To perform precise spatial subsetting, use the vector file again during data extraction. If provided, takes precedence over subset_bbox.

None
subset_bbox tuple of float

Bounding box as (min_lon, min_lat, max_lon, max_lat).

None
job_id str

Harmony job ID to resume a previous download. If provided, a new request will not be submitted and other parameters (time_range, subset_*) are ignored.

None
verbose bool

Whether to print progress messages

True

Returns:

Type Description
tuple of (list of Path, str)

Downloaded file paths and the job ID for potential resumption.

Examples:

>>> # Initial download
>>> files, job_id = download_data(
...     directory='data/gedi',
...     gedi_product='L2A',
...     time_range=('2020-01-01', '2020-01-31'),
...     subset_bbox=(-10, 40, 5, 50)
... )
>>> # Resume interrupted download
>>> files, job_id = download_data(
...     directory='data/gedi',
...     gedi_product='L2A',
...     job_id=job_id
... )
Source code in gedixr/download.py
def download_data(directory: str | Path,
                  gedi_product: str,
                  time_range: Optional[tuple[str, str]] = None,
                  subset_vector: Optional[str | Path] = None,
                  subset_bbox: Optional[tuple[float, float, float, float]] = None,
                  job_id: Optional[str] = None,
                  verbose: bool = True
                  ) -> list[Path]:
    """
    Download GEDI data using NASA Harmony API based on a time range and spatial subset.
    Please note that if `subset_vector` is provided, the download will be subset to the
    bounding box of the vector geometry and not the exact geometry itself. To perform
    precise spatial subsetting, use the vector file again during data extraction.

    Parameters
    ----------
    directory : str or Path
        Directory where downloaded files will be saved. A subdirectory named after the
        GEDI product will be created within this directory and files will be saved there.
    gedi_product : str
        GEDI product name: 'L2A' or 'L2B'
    time_range : tuple of str, optional
        Time range as (start_date, end_date) in format 'YYYY-MM-DD'
    subset_vector : str or Path, optional
        Path to vector file for spatial subsetting. Please note that the download will 
        be subset to the bounding box of the vector geometry and not the exact geometry 
        itself. To perform precise spatial subsetting, use the vector file again during 
        data extraction. If provided, takes precedence over subset_bbox.
    subset_bbox : tuple of float, optional
        Bounding box as (min_lon, min_lat, max_lon, max_lat).
    job_id : str, optional
        Harmony job ID to resume a previous download. If provided, a new request
        will not be submitted and other parameters (time_range, subset_*) are ignored.
    verbose : bool, default=True
        Whether to print progress messages

    Returns
    -------
    tuple of (list of Path, str)
        Downloaded file paths and the job ID for potential resumption.

    Examples
    --------
    >>> # Initial download
    >>> files, job_id = download_data(
    ...     directory='data/gedi',
    ...     gedi_product='L2A',
    ...     time_range=('2020-01-01', '2020-01-31'),
    ...     subset_bbox=(-10, 40, 5, 50)
    ... )
    >>> # Resume interrupted download
    >>> files, job_id = download_data(
    ...     directory='data/gedi',
    ...     gedi_product='L2A',
    ...     job_id=job_id
    ... )
    """
    short_name = con.PRODUCT_MAPPING.get(gedi_product.upper())
    if short_name is None:
        raise ValueError(f"Parameter 'gedi_product': expected to be one of "
                        f"{list(con.PRODUCT_MAPPING.keys())}; got '{gedi_product}' instead")

    directory = Path(directory)
    if not directory.exists():
        raise ValueError(f"Directory does not exist: {directory}")
    download_dir = directory.joinpath(gedi_product.upper())
    download_dir.mkdir(parents=True, exist_ok=True)

    harmony_client = _authenticate_earthdata()

    job_id_file = download_dir.joinpath('.harmony_job_id')
    if job_id is None:
        if job_id_file.exists():
            saved_job_id = job_id_file.read_text().strip()
            if verbose:
                print(f"Found existing job ID from previous run: {saved_job_id}")
                print("To resume this job, pass job_id parameter.")
                print("Submitting new request...")

        if time_range is not None:
            time_range = {'start': dt.datetime.fromisoformat(time_range[0]),
                          'stop': dt.datetime.fromisoformat(time_range[1])}

        bbox = _get_bbox(subset_vector, subset_bbox)

        capabilities = harmony_client.submit(CapabilitiesRequest(short_name=short_name))
        collection = Collection(id=capabilities['conceptId'])
        request = Request(
            collection=collection,
            spatial=bbox,
            temporal=time_range
        )
        if not request.is_valid():
            raise ValueError(f"Invalid Harmony request: {request.validate()}")

        job_id = harmony_client.submit(request)        
        job_id_file.write_text(job_id)
        if verbose:
            print(f"Job submitted with ID: {job_id}")
            print(f"Job ID saved to: {job_id_file}")
    else:
        if verbose:
            print(f"Resuming job with ID: {job_id}")

        job_id_file.write_text(job_id)

    if verbose:
        print("Files will be processed by Harmony before proceeding with download...")

    try:
        result_json = harmony_client.result_json(job_id, show_progress=verbose)
        status = result_json.get('status', 'unknown')
        if status == 'failed':
            _failed_status(download_dir, job_id, job_id_file, result_json)
        elif status not in ['successful', 'complete']:
            warnings.warn(
                f"Harmony job status is '{status}'. Proceeding with download but results may be incomplete.",
                UserWarning
            )
        if verbose:
            print("Processing complete. Starting download...")

        results = harmony_client.download_all(
            job_id,
            directory=str(download_dir),
            overwrite=True
        )    

        file_paths = [Path(f.result()) for f in results]
        if verbose:
            print(f"Downloaded {len(file_paths)} file(s) to {download_dir}")
        if len(file_paths) == 0:
            warnings.warn(
                "No files were downloaded. This may indicate an issue with the request or data availability.",
                UserWarning
            )

        if job_id_file.exists():
            job_id_file.unlink()

        return file_paths, job_id

    except (KeyboardInterrupt, Exception) as e:
        if verbose:
            if isinstance(e, KeyboardInterrupt):
                print(f"\nDownload interrupted by user. Job ID saved to: {job_id_file}")
            else:
                print(f"\nDownload interrupted due to error: {e}")
                print(f"Job ID saved to: {job_id_file}")
            print("To resume, run:")
            print(f"  download_data(directory='{directory}', gedi_product='{gedi_product}', job_id='{job_id}')")
            print("or use the CLI with --job-id option.")
        raise

extract_data

Extracts data from GEDI L2A or L2B files in HDF5 format using the following steps:

(1) Search a root directory recursively for GEDI L2A or L2B HDF5 files (2) OPTIONAL: Filter files by month of acquisition (3) Extract data from each file for specified beams and variables into a Dataframe (4) OPTIONAL: Filter out shots of poor quality (5) Convert Dataframe to GeoDataFrame including geometry column (6) OPTIONAL: Subset shots spatially using intersection via provided vector file or list of vector files (7) Save the result as a GeoParquet file or multiple files (one per provided vector file, if applicable) (8) Return a GeoDataFrame or dictionary of GeoDataFrame objects (one per provided vector file, if applicable)

Parameters:

Name Type Description Default
directory str | Path

Root directory to recursively search for GEDI L2A/L2B files.

required
gedi_product str

GEDI product type. Either 'L2A' or 'L2B'. Default is 'L2B'.

required
variables Optional[list[tuple[str, str]]]

List of tuples containing the desired column name in the returned GeoDataFrame and the GEDI layer name to be extracted. Defaults to those retrieved by gedixr.constants.DEFAULT_VARIABLES['<gedi_product>'].

None
beams Optional[str | list[str]]

Which GEDI beams to extract values from? Defaults to all beams (power and coverage beams). Use 'power' or 'coverage' for power or coverage beams, respectively. You can also provide a list of beam names, e.g.: ['BEAM0101', 'BEAM0110'].

None
filter_month Optional[tuple[int, int]]

Filter GEDI shots by month of the year? E.g. (6, 8) to only keep shots that were acquired between June 1st and August 31st of each year. Defaults to (1, 12), which keeps all shots of each year.

None
subset_vector Optional[str | Path | list[str | Path]]

Path or list of paths to vector files in a fiona supported format to subset the GEDI data spatially. Default is None, to keep all shots. Note that the basename of each vector file will be used in the output names, so it is recommended to give those files reasonable names beforehand!

None
apply_quality_filter bool

Apply a basic quality filter to the GEDI data? Default is True. This basic filtering strategy will filter out shots with quality_flag != 1, degrade_flag != 0, num_detectedmodes > 1, and difference between detected elevation and DEM elevation < 100 m.

True

Returns:

Name Type Description
GeoDataFrame or dictionary

In case of an output dictionary, these are the expected key, value pairs: {'<Vector Basename>': {'geo': Polygon, 'gdf': GeoDataFrame, 'path': Path}} where 'geo' is the geometry of the vector file, 'gdf' is the extracted GeoDataFrame for that geometry, and 'path' is the path to the output GeoParquet file. If no vector files were provided, a single GeoDataFrame is returned.

out_path Path or None

In case no vector files were provided, the path to the output GeoParquet file is returned. Otherwise, None is returned as the output paths are included in the output dictionary.

Source code in gedixr/extract.py
def extract_data(directory: str | Path,
                 gedi_product: str,
                 variables: Optional[list[tuple[str, str]]] = None,
                 beams: Optional[str| list[str]] = None,
                 filter_month: Optional[tuple[int, int]] = None,
                 subset_vector: Optional[str | Path | list[str | Path]] = None,
                 apply_quality_filter: bool = True
                 ) -> (GeoDataFrame | dict[str, dict[str, GeoDataFrame | Polygon] | Path], Optional[Path]):
    """
    Extracts data from GEDI L2A or L2B files in HDF5 format using the following
    steps:

    (1) Search a root directory recursively for GEDI L2A or L2B HDF5 files
    (2) OPTIONAL: Filter files by month of acquisition
    (3) Extract data from each file for specified beams and variables into a Dataframe
    (4) OPTIONAL: Filter out shots of poor quality
    (5) Convert Dataframe to GeoDataFrame including geometry column
    (6) OPTIONAL: Subset shots spatially using intersection via provided vector
        file or list of vector files
    (7) Save the result as a GeoParquet file or multiple files (one per
        provided vector file, if applicable)
    (8) Return a GeoDataFrame or dictionary of GeoDataFrame objects (one per provided
        vector file, if applicable)

    Parameters
    ----------
    directory: str or Path
        Root directory to recursively search for GEDI L2A/L2B files.
    gedi_product: str
        GEDI product type. Either 'L2A' or 'L2B'. Default is 'L2B'.
    variables: list of tuple of str, optional
        List of tuples containing the desired column name in the returned
        GeoDataFrame and the GEDI layer name to be extracted. Defaults to those
        retrieved by `gedixr.constants.DEFAULT_VARIABLES['<gedi_product>']`.
    beams: str or list of str, optional
        Which GEDI beams to extract values from? Defaults to all beams (power and
        coverage beams). Use `'power'` or `'coverage'` for power or coverage beams,
        respectively. You can also provide a list of beam names, e.g.:
        `['BEAM0101', 'BEAM0110']`.
    filter_month: tuple(int), optional
        Filter GEDI shots by month of the year? E.g. (6, 8) to only keep shots
        that were acquired between June 1st and August 31st of each year.
        Defaults to (1, 12), which keeps all shots of each year.
    subset_vector: str or Path or list of str or Path, optional
        Path or list of paths to vector files in a fiona supported format to
        subset the GEDI data spatially. Default is None, to keep all shots.
        Note that the basename of each vector file will be used in the output
        names, so it is recommended to give those files reasonable names
        beforehand!
    apply_quality_filter: bool, optional
        Apply a basic quality filter to the GEDI data? Default is True. This basic
        filtering strategy will filter out shots with quality_flag != 1,
        degrade_flag != 0, num_detectedmodes > 1, and difference between detected
        elevation and DEM elevation < 100 m.

    Returns
    -------
    GeoDataFrame or dictionary
        In case of an output dictionary, these are the expected key, value pairs:
            `{'<Vector Basename>': {'geo': Polygon, 'gdf': GeoDataFrame, 'path': Path}}`
            where 'geo' is the geometry of the vector file, 'gdf' is the extracted
            GeoDataFrame for that geometry, and 'path' is the path to the output
            GeoParquet file.
        If no vector files were provided, a single GeoDataFrame is returned.
    out_path: Path or None
        In case no vector files were provided, the path to the output GeoParquet
        file is returned. Otherwise, None is returned as the output paths are
        included in the output dictionary.
    """
    if gedi_product not in con.ALLOWED_PRODUCTS:
        raise RuntimeError(f"Parameter 'gedi_product': expected to be one of "
                           f"{con.ALLOWED_PRODUCTS}; got {gedi_product} instead")

    directory = anc.to_pathlib(x=directory)
    subset_vector = anc.to_pathlib(x=subset_vector) if \
        (subset_vector is not None) else None
    log_handler, now = anc.set_logging(directory, gedi_product)
    anc.log(handler=log_handler, mode='info',
            msg=f"Starting GEDI {gedi_product} data extraction using parameters: "
                f"variables={variables}, beams={beams}, "
                f"filter_month={filter_month}, "
                f"subset_vector={subset_vector}, "
                f"apply_quality_filter={apply_quality_filter}")

    anc.error_tracker.reset() 
    out_dict = None
    if gedi_product == 'L2A':
        variables = con.DEFAULT_VARIABLES['L2A'] if variables is None else variables
        pattern = con.PATTERN_L2A
    else:
        variables = con.DEFAULT_VARIABLES['L2B'] if variables is None else variables
        pattern = con.PATTERN_L2B
    if beams is None:
        beams = con.POWER_BEAMS + con.COVERAGE_BEAMS
    elif beams == 'power':
        beams = con.POWER_BEAMS
    elif beams == 'coverage':
        beams = con.COVERAGE_BEAMS
    else:
        beams = beams
    if filter_month is None:
        filter_month = (1, 12)
    if subset_vector is not None:
        out_dict = anc.prepare_vec(vec=subset_vector)
    layers = con.DEFAULT_BASE[gedi_product] + variables

    try:
        # (1) Search for GEDI files
        filepaths = [p for p in directory.rglob('*') if p.is_file() and
                     p.match(pattern)]

        if len(filepaths) == 0:
            raise RuntimeError(f"No GEDI {gedi_product} files were found in "
                               f"{directory}.")

        gdf_list_no_spatial_subset = []
        for i, fp in enumerate(tqdm(filepaths)):
            # (2) Filter by month of acquisition
            date = _date_from_gedi_file(gedi_path=fp)
            if filter_month[0] > filter_month[1]:
                filter_month = (filter_month[1], filter_month[0])
            if not filter_month[0] <= date.month <= filter_month[1]:
                msg = (f"Time of acquisition outside of filter range: "
                       f"month_min={filter_month[0]}, "
                       f"month_max={filter_month[1]}")
                anc.log(handler=log_handler, mode='info', file=fp.name, msg=msg)
                continue

            try:
                gedi = h5py.File(fp, 'r')

                # (3) Extract data for specified beams and variables
                df = pd.DataFrame(_from_file(gedi=gedi,
                                             gedi_fp=fp,
                                             gedi_product=gedi_product,
                                             beams=beams,
                                             layers=layers,
                                             acq_time=date,
                                             log_handler=log_handler))

                # (4) Filter by quality flags
                if apply_quality_filter:
                    df = _filter_quality(df=df, log_handler=log_handler, gedi_path=fp)

                # (5) Convert to GeoDataFrame, set 'Shot Number' as index and convert
                # acquisition time to datetime
                df['geometry'] = df.apply(lambda row:
                                          Point(row.longitude, row.latitude),
                                          axis=1)
                df = df.drop(columns=['latitude', 'longitude'])
                gdf = gp.GeoDataFrame(df)
                gdf.set_crs(epsg=4326, inplace=True)
                gdf['acq_time'] = pd.to_datetime(gdf['acq_time'])

                # (6) Subset spatially if any vector files were provided
                if subset_vector is not None:
                    for k, v in out_dict.items():
                        gdf_sub = gdf[gdf.intersects(v['geo'])]
                        if not gdf_sub.empty:
                            if out_dict[k]['gdf'] is None:
                                out_dict[k]['gdf'] = gdf_sub
                            else:
                                gdf_cat = pd.concat([out_dict[k]['gdf'], gdf_sub])
                                out_dict[k]['gdf'] = gdf_cat
                        del gdf_sub
                else:
                    if not gdf.empty:
                        gdf_list_no_spatial_subset.append(gdf)

                gedi.close()
                del df, gdf
            except Exception as msg:
                anc.log(handler=log_handler, mode='exception', file=fp.name,
                        msg=str(msg))
                anc.error_tracker.increment()

        # (7) & (8)
        flt = 1 if apply_quality_filter else 0
        out_dir = directory / 'extracted'
        out_dir.mkdir(exist_ok=True)
        if subset_vector is not None:
            for k, v in out_dict.items():
                v['path'] = None
                if v['gdf'] is not None:
                    out_path = out_dir.joinpath(f'{now}_{gedi_product}_{flt}_{k}.parquet')
                    v['gdf'].to_parquet(out_path)
                    v['path'] = out_path
            return out_dict, None
        else:
            out_path = None
            # make sure that gdf's in list are not all empty 
            if gdf_list_no_spatial_subset:
                out = pd.concat(gdf_list_no_spatial_subset)
                out_path = out_dir.joinpath(f'{now}_{gedi_product}_{flt}.parquet')
                out.to_parquet(out_path)
            else:
                anc.log(handler=log_handler, mode='info',
                        msg="No GEDI shots passed the filtering criteria; "
                            "no output file created.")
                out = GeoDataFrame()
            return out, out_path
    except Exception as msg:
        anc.log(handler=log_handler, mode='exception', msg=str(msg))
        anc.error_tracker.increment()
    finally:
        anc.close_logging(log_handler=log_handler)
        error_count = anc.error_tracker.count
        if error_count > 0:
            print(f"WARNING: {error_count} errors occurred during the extraction "
                  f"process. Please check the log file!")

Post-extraction Functions

load_to_gdf

Loads GEDI L2A and/or L2B GeoParquet or GeoPackage files as GeoDataFrames. If both are provided, they will be merged into a single GeoDataFrame.

Parameters:

Name Type Description Default
l2a Optional[str | Path]

Path to a GEDI L2A GeoParquet or GeoPackage file.

None
l2b Optional[str | Path]

Path to a GEDI L2B GeoParquet or GeoPackage file.

None

Returns:

Name Type Description
final_gdf GeoDataFrame

GeoDataFrame containing the data from the provided GEDI L2A and/or L2B files.

Source code in gedixr/xr.py
def load_to_gdf(l2a: Optional[str | Path] = None,
                l2b: Optional[str | Path] = None
                ) -> GeoDataFrame:
    """
    Loads GEDI L2A and/or L2B GeoParquet or GeoPackage files as GeoDataFrames. 
    If both are provided, they will be merged into a single GeoDataFrame.

    Parameters
    ----------
    l2a: str or Path, optional
        Path to a GEDI L2A GeoParquet or GeoPackage file.
    l2b: str or Path, optional
        Path to a GEDI L2B GeoParquet or GeoPackage file.

    Returns
    -------
    final_gdf: GeoDataFrame
        GeoDataFrame containing the data from the provided GEDI L2A and/or L2B files.
    """
    if all(x is None for x in [l2a, l2b]):
        raise RuntimeError("At least one of the parameters 'l2a' or "
                           "'l2b' must be provided!")
    elif all(x is not None for x in [l2a, l2b]):
        gdf_l2a = _reader(l2a)
        gdf_l2b = _reader(l2b)
        final_gdf = merge_gdf(l2a=gdf_l2a, l2b=gdf_l2b)
    else:
        fp = l2a if l2a is not None else l2b
        final_gdf = _reader(fp)
        final_gdf['acq_time'] = pd.to_datetime(final_gdf['acq_time'])
    return final_gdf

merge_gdf

Merges the data of two GeoDataFrames containing GEDI L2A and L2B data. If dictionaries are provided, the function assumes key, value pairs of the dictionary output of gedixr.extract.extract_data. The function will merge the data of matching geometries and return a dictionary of GeoDataFrames.

Parameters:

Name Type Description Default
l2a GeoDataFrame | dict

GeoDataFrame or a dictionary of GeoDataFrames containing GEDI L2A data.

required
l2b GeoDataFrame | dict

GeoDataFrame or a dictionary of GeoDataFrames containing GEDI L2B data.

required
how str

The type of merge to be performed. Default is 'inner'.

'inner'
on Optional[str | list[str]]

The column(s) to merge on. Default is ['geometry', 'shot', 'acq_time'].

None

Returns:

Name Type Description
merged_out GeoDataFrame or dict

A GeoDataFrame or a dictionary of GeoDataFrames containing the merged GEDI L2A and L2B data.

Source code in gedixr/xr.py
def merge_gdf(l2a: GeoDataFrame | dict,
              l2b: GeoDataFrame | dict,
              how: str = 'inner',
              on: Optional[str | list[str]] = None
              ) -> GeoDataFrame | dict:
    """
    Merges the data of two GeoDataFrames containing GEDI L2A and L2B data. If
    dictionaries are provided, the function assumes key, value pairs of the dictionary
    output of `gedixr.extract.extract_data`. The function will merge the data of
    matching geometries and return a dictionary of GeoDataFrames.

    Parameters
    ----------
    l2a: GeoDataFrame or dict
        GeoDataFrame or a dictionary of GeoDataFrames containing GEDI L2A data.
    l2b: GeoDataFrame or dict
        GeoDataFrame or a dictionary of GeoDataFrames containing GEDI L2B data.
    how: str, optional
        The type of merge to be performed. Default is 'inner'.
    on: str or list of str, optional
        The column(s) to merge on. Default is ['geometry', 'shot', 'acq_time'].

    Returns
    -------
    merged_out: GeoDataFrame or dict
        A GeoDataFrame or a dictionary of GeoDataFrames containing the merged
        GEDI L2A and L2B data.
    """
    suffixes = ('_l2a', '_l2b')
    if on is None:
        on = ['geometry', 'shot', 'acq_time']
    if all([isinstance(gdf, dict) for gdf in [l2a, l2b]]):
        if len(l2a.keys()) != len(l2b.keys()):
            print(f"WARNING: The provided dictionaries contain data from a "
                  f"different number of geometries: "
                  f"({len(l2a.keys())} vs. {len(l2b.keys())})."
                  f"\nOnly data of matching geometries will be merged and returned.")

        matched = set(l2a.keys()).intersection(set(l2b.keys()))
        if len(matched) == 0:
            raise RuntimeError("No matching geometries found between the provided "
                               "dictionaries.")

        merged_out = {}
        for aoi in matched:
            _run_checks(l2a[aoi], l2b[aoi], key=aoi)
            merged_gdf = l2b[aoi]['gdf'].merge(l2a[aoi]['gdf'],
                                               how=how, on=on, suffixes=suffixes)
            merged_out[aoi] = {}
            merged_out[aoi]['gdf'] = merged_gdf
            merged_out[aoi]['geo'] = l2a[aoi]['geo']
    elif all([isinstance(gdf, GeoDataFrame) for gdf in [l2a, l2b]]):
        _compare_gdfs(l2a, l2b)
        merged_out = l2b.merge(l2a, how=how, on=on, suffixes=suffixes)
    else:
        raise RuntimeError("The provided input is not supported.")
    return merged_out

gdf_to_xr

Rasterizes a GeoDataFrame containing GEDI L2A/L2B data to an xarray Dataset.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing GEDI L2A/L2B data.

required
measurements Optional[list[str]]

List of measurements names (i.e. GEDI variables) to be included. Default is None, which will include all measurements.

None
resolution Optional[tuple[float, float]]

A tuple of the pixel spacing of the returned data (Y, X). This includes the direction (as indicated by a positive or negative number). Default is (-0.0003, 0.0003), which corresponds to a spacing of 30 m.

None

Returns:

Name Type Description
cube Dataset

An xarray Dataset containing the rasterized GEDI data.

Source code in gedixr/xr.py
def gdf_to_xr(gdf: GeoDataFrame,
              measurements: Optional[list[str]] = None,
              resolution: Optional[tuple[float, float]] = None
              ) -> Dataset:
    """
    Rasterizes a GeoDataFrame containing GEDI L2A/L2B data to an xarray Dataset.

    Parameters
    ----------
    gdf: GeoDataFrame
        GeoDataFrame containing GEDI L2A/L2B data.
    measurements: list of str, optional
        List of measurements names (i.e. GEDI variables) to be included.
        Default is None, which will include all measurements.
    resolution: tuple of float, optional
        A tuple of the pixel spacing of the returned data (Y, X). This includes
        the direction (as indicated by a positive or negative number). Default
        is (-0.0003, 0.0003), which corresponds to a spacing of 30 m.

    Returns
    -------
    cube: Dataset
        An xarray Dataset containing the rasterized GEDI data.
    """
    if resolution is None:
        resolution = (-0.0003, 0.0003)
    xr_ds = make_geocube(vector_data=gdf,
                         measurements=measurements,
                         output_crs=f'epsg:{gdf.crs.to_epsg()}',
                         resolution=resolution)
    return xr_ds