Module intercom_python_sdk.apis.data_export.models

Data Export API Models

apis/data_exports/models.py

This module contains models used to interact with the Intercom Data Exports API [1]. These models provide object oriented interfaces for the schemas defined in apis/data_exports/schemas.py.


Expand source code
"""
# Data Export API Models

`apis/data_exports/models.py`

This module contains models used to interact with the Intercom Data Exports API [1].
These models provide object oriented interfaces for the schemas defined in `apis/data_exports/schemas.py`.

---
- [1] https://developers.intercom.com/intercom-api-reference/reference/the-export-job-model
"""

# Built-ins
import time
import requests
from typing import (
    TYPE_CHECKING,
)

# From Current Package
from ...core.model_base import ModelBase

if TYPE_CHECKING:
    from .api import DataExportAPI


class DataExportJob(ModelBase):
    """ A data export job. """
    def __init__(self, *args, **kwargs):
        self.__job_identifier = kwargs.get("job_identifier", "")
        self.__status = kwargs.get("status", "")
        self.__download_expires_at = kwargs.get("download_expires_at", "")
        self.__download_url = kwargs.get("download_url", "")
        self._api_client: DataExportAPI = kwargs.get("api_client")  # type: ignore

    @property
    def api_client(self) -> 'DataExportAPI':
        return self._api_client

    @api_client.setter
    def api_client(self, value: 'DataExportAPI'):
        self._api_client = value

    @property
    def job_identifier(self) -> str:
        """ The unique identifier for this data export job. """
        return self.__job_identifier

    @property
    def status(self) -> str:
        """ Status of this data export job.

        Possible values:
            - pending
            - in_progress
            - completed
            - failed
            - no_data
            - cancelled
        """
        return self.__status

    @property
    def download_expires_at(self) -> str:
        """ Time after which the download will be unavailable. """
        return self.__download_expires_at

    @property
    def download_url(self) -> str:
        """ URL to download the data export. """
        return self.__download_url

    def update(self, **kwargs) -> 'DataExportJob':
        """ Update this data export job to fetch it's current status and values. """
        job = self.api_client.get(job_identifier=self.job_identifier)
        self.__update_self(job)  # type: ignore

        return self

    def cancel(self) -> 'DataExportJob':
        """ Cancel this data export job. """
        job = self.api_client.cancel(job_identifier=self.job_identifier)
        self.__update_self(job)  # type: ignore

        return self

    def wait_for_completion(self, timeout=120, frequency=5):
        """ Wait for the data export job to complete.

        Be aware that this method will block your thread until the job is completed.
        Using this method without a timeout is not recommended.

        Params:
            timeout (int): The maximum number of seconds to wait for the job to complete.
            frequency (int): The number of seconds to wait between checking the job status.
        Returns:
            bool: True once the job is completed (regardless of success or failure)
        Raises:
            TimeoutError: If the job does not complete within the specified timeout.
        """
        timeout_time = time.time() + timeout
        while self.status == "pending":
            if time.time() > timeout_time:
                raise TimeoutError(
                    f"Data export job {self.job_identifier} \
                    failed to complete within {timeout} seconds."
                )

            time.sleep(frequency)
            self.update()

        return True

    def download(self):
        """ Download this data export job.

        Returns:
            bytes: The data export file. Generally a gzip compressed CSV file.
        """
        if self.download_url:
            response = self.api_client.download(job_identifier=self.job_identifier)
            if not isinstance(response, requests.Response):
                return response
            if response.ok:
                return response.content
            else:
                raise ValueError(f"Failed to download data export job {self.job_identifier} \
                                       with error: {response.text}")

    def __update_self(self, job: 'DataExportJob'):
        self.__status = job.status
        self.__download_expires_at = job.download_expires_at
        self.__download_url = job.download_url

Classes

class DataExportJob (*args, **kwargs)

A data export job.

Expand source code
class DataExportJob(ModelBase):
    """ A data export job. """
    def __init__(self, *args, **kwargs):
        self.__job_identifier = kwargs.get("job_identifier", "")
        self.__status = kwargs.get("status", "")
        self.__download_expires_at = kwargs.get("download_expires_at", "")
        self.__download_url = kwargs.get("download_url", "")
        self._api_client: DataExportAPI = kwargs.get("api_client")  # type: ignore

    @property
    def api_client(self) -> 'DataExportAPI':
        return self._api_client

    @api_client.setter
    def api_client(self, value: 'DataExportAPI'):
        self._api_client = value

    @property
    def job_identifier(self) -> str:
        """ The unique identifier for this data export job. """
        return self.__job_identifier

    @property
    def status(self) -> str:
        """ Status of this data export job.

        Possible values:
            - pending
            - in_progress
            - completed
            - failed
            - no_data
            - cancelled
        """
        return self.__status

    @property
    def download_expires_at(self) -> str:
        """ Time after which the download will be unavailable. """
        return self.__download_expires_at

    @property
    def download_url(self) -> str:
        """ URL to download the data export. """
        return self.__download_url

    def update(self, **kwargs) -> 'DataExportJob':
        """ Update this data export job to fetch it's current status and values. """
        job = self.api_client.get(job_identifier=self.job_identifier)
        self.__update_self(job)  # type: ignore

        return self

    def cancel(self) -> 'DataExportJob':
        """ Cancel this data export job. """
        job = self.api_client.cancel(job_identifier=self.job_identifier)
        self.__update_self(job)  # type: ignore

        return self

    def wait_for_completion(self, timeout=120, frequency=5):
        """ Wait for the data export job to complete.

        Be aware that this method will block your thread until the job is completed.
        Using this method without a timeout is not recommended.

        Params:
            timeout (int): The maximum number of seconds to wait for the job to complete.
            frequency (int): The number of seconds to wait between checking the job status.
        Returns:
            bool: True once the job is completed (regardless of success or failure)
        Raises:
            TimeoutError: If the job does not complete within the specified timeout.
        """
        timeout_time = time.time() + timeout
        while self.status == "pending":
            if time.time() > timeout_time:
                raise TimeoutError(
                    f"Data export job {self.job_identifier} \
                    failed to complete within {timeout} seconds."
                )

            time.sleep(frequency)
            self.update()

        return True

    def download(self):
        """ Download this data export job.

        Returns:
            bytes: The data export file. Generally a gzip compressed CSV file.
        """
        if self.download_url:
            response = self.api_client.download(job_identifier=self.job_identifier)
            if not isinstance(response, requests.Response):
                return response
            if response.ok:
                return response.content
            else:
                raise ValueError(f"Failed to download data export job {self.job_identifier} \
                                       with error: {response.text}")

    def __update_self(self, job: 'DataExportJob'):
        self.__status = job.status
        self.__download_expires_at = job.download_expires_at
        self.__download_url = job.download_url

Ancestors

Instance variables

var api_client : DataExportAPI
Expand source code
@property
def api_client(self) -> 'DataExportAPI':
    return self._api_client
var download_expires_at : str

Time after which the download will be unavailable.

Expand source code
@property
def download_expires_at(self) -> str:
    """ Time after which the download will be unavailable. """
    return self.__download_expires_at
var download_url : str

URL to download the data export.

Expand source code
@property
def download_url(self) -> str:
    """ URL to download the data export. """
    return self.__download_url
var job_identifier : str

The unique identifier for this data export job.

Expand source code
@property
def job_identifier(self) -> str:
    """ The unique identifier for this data export job. """
    return self.__job_identifier
var status : str

Status of this data export job.

Possible values: - pending - in_progress - completed - failed - no_data - cancelled

Expand source code
@property
def status(self) -> str:
    """ Status of this data export job.

    Possible values:
        - pending
        - in_progress
        - completed
        - failed
        - no_data
        - cancelled
    """
    return self.__status

Methods

def cancel(self) ‑> DataExportJob

Cancel this data export job.

Expand source code
def cancel(self) -> 'DataExportJob':
    """ Cancel this data export job. """
    job = self.api_client.cancel(job_identifier=self.job_identifier)
    self.__update_self(job)  # type: ignore

    return self
def download(self)

Download this data export job.

Returns

bytes
The data export file. Generally a gzip compressed CSV file.
Expand source code
def download(self):
    """ Download this data export job.

    Returns:
        bytes: The data export file. Generally a gzip compressed CSV file.
    """
    if self.download_url:
        response = self.api_client.download(job_identifier=self.job_identifier)
        if not isinstance(response, requests.Response):
            return response
        if response.ok:
            return response.content
        else:
            raise ValueError(f"Failed to download data export job {self.job_identifier} \
                                   with error: {response.text}")
def update(self, **kwargs) ‑> DataExportJob

Update this data export job to fetch it's current status and values.

Expand source code
def update(self, **kwargs) -> 'DataExportJob':
    """ Update this data export job to fetch it's current status and values. """
    job = self.api_client.get(job_identifier=self.job_identifier)
    self.__update_self(job)  # type: ignore

    return self
def wait_for_completion(self, timeout=120, frequency=5)

Wait for the data export job to complete.

Be aware that this method will block your thread until the job is completed. Using this method without a timeout is not recommended.

Params

timeout (int): The maximum number of seconds to wait for the job to complete. frequency (int): The number of seconds to wait between checking the job status.

Returns

bool
True once the job is completed (regardless of success or failure)

Raises

TimeoutError
If the job does not complete within the specified timeout.
Expand source code
def wait_for_completion(self, timeout=120, frequency=5):
    """ Wait for the data export job to complete.

    Be aware that this method will block your thread until the job is completed.
    Using this method without a timeout is not recommended.

    Params:
        timeout (int): The maximum number of seconds to wait for the job to complete.
        frequency (int): The number of seconds to wait between checking the job status.
    Returns:
        bool: True once the job is completed (regardless of success or failure)
    Raises:
        TimeoutError: If the job does not complete within the specified timeout.
    """
    timeout_time = time.time() + timeout
    while self.status == "pending":
        if time.time() > timeout_time:
            raise TimeoutError(
                f"Data export job {self.job_identifier} \
                failed to complete within {timeout} seconds."
            )

        time.sleep(frequency)
        self.update()

    return True