Module intercom_python_sdk.apis.data_export.api

Data Export API

apis/data_export/api.py

This module contains the Data Export API class, which defines a client for the Data Export API. It is used to interact with the Intercom Data Events API [1] as defined in the Intercom API Reference.


Expand source code
"""
# Data Export API

`apis/data_export/api.py`

This module contains the Data Export API class, which defines a client for the Data Export API.
It is used to interact with the Intercom Data Events API [1] as defined in the Intercom API Reference.

----
- [1] https://developers.intercom.com/intercom-api-reference/reference/create-data-export
"""

# Built-ins
from typing import Union
from datetime import datetime
from validator_collection import validators

# External
from uplink import (
    get, post,
    returns, headers,
    response_handler,
    json, Body, Path
)

# From Current API
from . import schemas as dexport_schemas

# From Current Package
from ...core.api_base import APIBase
from ...core.errors import catch_api_error


@response_handler(catch_api_error)
class DataExportAPI(APIBase):
    URI = "/export/"

    @json
    @returns(dexport_schemas.DataExportJobSchema())  # type: ignore
    @post("content/data")
    def __create_data_export(self, payload: Body(type=dict)):  # type: ignore
        """ Create a new data export. """

    def export(self, created_before: Union[int, datetime], created_after: Union[int, datetime]):
        """ Create a new data export.

        Args:
            created_before (Union[int, datetime]): Datetime or unix epoch to define the upper bound of the export.
            created_after (Union[int, datetime]): Datetime or unix epoch to define the lower bound of the export.

        Returns:
            DataExportJob: The data export job.
        """

        if isinstance(created_before, datetime):
            created_before = int(created_before.timestamp())
        else:
            created_before = validators.integer(created_before, minimum=0)

        if isinstance(created_after, datetime):
            created_after = int(created_after.timestamp())
        else:
            created_after = validators.integer(created_after, minimum=0)

        payload = {
            "created_before": created_before,
            "created_after": created_after
        }

        return self.__create_data_export(payload=payload)

    @headers({"Accept": "application/octet-stream"})
    @get("/download/content/data/{job_identifier}")  # Leading slash as we aren't using the base URI
    def download(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
        """ Download a data export. """

    @returns(dexport_schemas.DataExportJobSchema())  # type: ignore
    @post("cancel/{job_identifier}")
    def cancel(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
        """ Cancel a data export. """

    @returns(dexport_schemas.DataExportJobSchema())  # type: ignore
    @post("content/data/{job_identifier}")
    def get(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
        """ Get a data export. """

Classes

class DataExportAPI (config: Configuration, **kwargs)

The base class for all API classes in the Intercom Python SDK.

Initializes a new instance of the APIBase class. Overrides the Consumer class from the Uplink library to configure the Consumer instance using a Configuration object.

Args

config
The configuration settings for the API.
Expand source code
@response_handler(catch_api_error)
class DataExportAPI(APIBase):
    URI = "/export/"

    @json
    @returns(dexport_schemas.DataExportJobSchema())  # type: ignore
    @post("content/data")
    def __create_data_export(self, payload: Body(type=dict)):  # type: ignore
        """ Create a new data export. """

    def export(self, created_before: Union[int, datetime], created_after: Union[int, datetime]):
        """ Create a new data export.

        Args:
            created_before (Union[int, datetime]): Datetime or unix epoch to define the upper bound of the export.
            created_after (Union[int, datetime]): Datetime or unix epoch to define the lower bound of the export.

        Returns:
            DataExportJob: The data export job.
        """

        if isinstance(created_before, datetime):
            created_before = int(created_before.timestamp())
        else:
            created_before = validators.integer(created_before, minimum=0)

        if isinstance(created_after, datetime):
            created_after = int(created_after.timestamp())
        else:
            created_after = validators.integer(created_after, minimum=0)

        payload = {
            "created_before": created_before,
            "created_after": created_after
        }

        return self.__create_data_export(payload=payload)

    @headers({"Accept": "application/octet-stream"})
    @get("/download/content/data/{job_identifier}")  # Leading slash as we aren't using the base URI
    def download(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
        """ Download a data export. """

    @returns(dexport_schemas.DataExportJobSchema())  # type: ignore
    @post("cancel/{job_identifier}")
    def cancel(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
        """ Cancel a data export. """

    @returns(dexport_schemas.DataExportJobSchema())  # type: ignore
    @post("content/data/{job_identifier}")
    def get(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
        """ Get a data export. """

Ancestors

  • APIBase
  • uplink.builder.Consumer
  • uplink.interfaces.Consumer
  • uplink.builder._Consumer

Class variables

var URI

Methods

def cancel(self, job_identifier: )

Cancel a data export.

Expand source code
@returns(dexport_schemas.DataExportJobSchema())  # type: ignore
@post("cancel/{job_identifier}")
def cancel(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
    """ Cancel a data export. """
def download(self, job_identifier: )

Download a data export.

Expand source code
@headers({"Accept": "application/octet-stream"})
@get("/download/content/data/{job_identifier}")  # Leading slash as we aren't using the base URI
def download(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
    """ Download a data export. """
def export(self, created_before: Union[int, datetime.datetime], created_after: Union[int, datetime.datetime])

Create a new data export.

Args

created_before : Union[int, datetime]
Datetime or unix epoch to define the upper bound of the export.
created_after : Union[int, datetime]
Datetime or unix epoch to define the lower bound of the export.

Returns

DataExportJob
The data export job.
Expand source code
def export(self, created_before: Union[int, datetime], created_after: Union[int, datetime]):
    """ Create a new data export.

    Args:
        created_before (Union[int, datetime]): Datetime or unix epoch to define the upper bound of the export.
        created_after (Union[int, datetime]): Datetime or unix epoch to define the lower bound of the export.

    Returns:
        DataExportJob: The data export job.
    """

    if isinstance(created_before, datetime):
        created_before = int(created_before.timestamp())
    else:
        created_before = validators.integer(created_before, minimum=0)

    if isinstance(created_after, datetime):
        created_after = int(created_after.timestamp())
    else:
        created_after = validators.integer(created_after, minimum=0)

    payload = {
        "created_before": created_before,
        "created_after": created_after
    }

    return self.__create_data_export(payload=payload)
def get(self, job_identifier: )

Get a data export.

Expand source code
@returns(dexport_schemas.DataExportJobSchema())  # type: ignore
@post("content/data/{job_identifier}")
def get(self, job_identifier: Path("job_identifier", str)):  # noqa # type: ignore
    """ Get a data export. """