comotion.dash module¶
-
class
comotion.dash.DashBulkUploader(config: comotion.dash.DashConfig)[source]¶ Class to handle multiple loads with utility functions by leveraging the Load class. Since a Load creates an upload to a lake table, DashBulkUploader helps to manage uploads to several lake tables with 1 object.
Example of upload with a DashBulkUploader instance:
config = DashConfig(Auth(orgname = 'my_org_name')) uploader = DashBulkUploader(config = config) # A better use would be to loop through a config to run the following below code repeatedly. A single upload is shown for demonstration purposes. my_lake_table = 'v1_inforce_policies' # Create the load in the uploader uploader.add_load( table_name = my_lake_table, check_sum = { 'count(*)': 50000 }, # Alternatively, provide the track_rows_uploaded arg as true load_as_service_client_id = '0' ) # Add the relevant datasources to the load. This can point to a directory to upload all contained files uploader.add_data_to_load( table_name = my_lake_table, data = 'data/inforce_policies' ) # View uploads added print(uploader.uploads) uploader.execute_all_uploads() # Use execute_upload or execute_multiple_uploads if only certain uploads should be kicked off. print(uploader.get_load_info()) # View status of uploads. This also refreshes uploader.uploads # Remove uploads/datasources if you want to re-use the same instance uploader.remove_load(table_name = my_lake_table)
-
add_data_to_load(table_name: str, data: Union[str, pandas.core.frame.DataFrame, comotion.dash.Query], file_key: str = None, source_type: str = None, **pd_read_kwargs) → None[source]¶ Adds data to an existing load for a specified lake table. This function supports adding data from various sources including dataframes, directories, and files.
- Parameters
table_name (str) – The name of the lake table to which data will be added. A load should already be added for this table_name.
data (Union[str, pd.DataFrame, Query]) – The data to be added. Can be a path to a file or directory, a pandas.DataFrame or a dash.Query. See Load for how different upload types will be executed.
file_key (str, optional) – Optional custom key for the file. This will ensure idempotence. Must be lowercase and can include underscores. If not provided, a random file key will be generated using DashBulkUploader.create_file_key(). If a directory is provided as the source of data, file_key is ignored and a random file key is generated for each file in the directory. If multiple files are uploaded to the same load with the same file_key, only the last one will be pushed to the lake.
source_type (str, optional) – The type of data source. Can be ‘df’ for DataFrame, ‘dir’ for directory, or ‘file’ for file. If not specified, the function will attempt to infer the source type. If a directory is provided, loop through the paths in the directory from listdir() and add valid files as datasources for the lake table.
pd_read_kwargs (dict, optional) – Additional keyword arguments to pass to the pandas read function (one of [pd.read_csv, pd.read_parquet, pd.read_json, pd.read_excel]).
- Raises
ValueError – If no existing load is found for the specified table, if the table name is not lowercase, or if the source type cannot be identified.
KeyError – If neither check_sum nor track_rows_uploaded is provided.
- Returns
- Return type
None
-
add_load(table_name: str, check_sum: Optional[Dict[str, Union[int, float, str]]] = None, modify_lambda: Callable = None, load_type: str = 'APPEND_ONLY', load_as_service_client_id: str = None, partitions: Optional[List[str]] = None, track_rows_uploaded: bool = False, path_to_output_for_dryrun: str = None, chunksize: int = None) → None[source]¶ Creates a new load for a specified lake table. This function initializes the load process, ensuring that the table name is in lowercase and that a checksum or row tracking is provided for data integrity. Created loads can be fetched using the DashBulkUploader.uploads class variable.
- Parameters
table_name (str) – The name of the lake table. Must be in lowercase.
check_sum (Optional[Dict[str, Union[int, float, str]]]) – Checksum data for the files to be committed. The checksum should be a dictionary with Presto/Trino expressions as keys and their expected results as values.
modify_lambda (Callable, optional) – A lambda function to modify the data before loading.
load_type (str, default 'APPEND_ONLY') – The type of load operation. Default is ‘APPEND_ONLY’.
load_as_service_client_id (str, optional) – The service client ID to use for the load. If service_client is not a field in the source data added to the load, this must be provided or the load will fail on commit.
partitions (Optional[List[str]], optional) – List of partitions for the load to improve query efficiency from the Dash lake.
track_rows_uploaded (bool, default False) – Whether to track the number of rows uploaded for the load.
path_to_output_for_dryrun (str, optional) – If specified, no upload will be made to dash, but files will be saved to the location specified. This is useful for testing.
chunksize (int, optional) – Data source will be broken into chunks with chunksize rows before uploading.
- Raises
ValueError – If the table name contains uppercase characters or if a load has already been created for the table.
KeyError – If neither check_sum nor track_rows_uploaded is provided.
- Returns
- Return type
None
-
execute_all_uploads()[source]¶ Uses execute_upload function for all loads created with the DashBulkUploader.
-
execute_multiple_uploads(table_names: List[str], max_workers: int = None)[source]¶ Uses execute_upload function for each table name in the list provided.
-
execute_upload(table_name: str, max_workers: int = None) → None[source]¶ Executes the upload process for a specified lake table. This function uses multi-threading to upload data sources concurrently and commits the load upon completion.
- Parameters
table_name (str) – The name of the lake table to which data will be uploaded.
max_workers (int) – The maximum number of threads to use for concurrent uploads (passed to concurrent.futures.ThreadPoolExecutor)
- Raises
ValueError – If no existing load is found for the specified table.
- Returns
- Return type
None
-
get_load_info()[source]¶ Retrieves the load information for all loads created. This also updates the load status for each Load created by the DashBulkUploader.
- Returns
load_info – A dictionary containing the load information for each table, with table names as keys and their respective load statuses as values.
- Return type
dict
- Raises
Exception – If an error occurs while retrieving the load information for any table, it is caught and printed, but the function continues to retrieve the remaining load statuses.
-
-
class
comotion.dash.DashConfig(auth: comotion.auth.Auth, zone: str = None)[source]¶ Object containing configuration information for Dash API
-
auth¶ comotion.Auth object holding information about authentication
- Type
comotion.Auth
-
zone¶ The zone to use for the API. If not provided, defaults to None, i.e. the main zone.
- Type
str, optional
-
-
class
comotion.dash.Load(config: comotion.dash.DashConfig, load_type: str = None, table_name: str = None, load_as_service_client_id: str = None, partitions: Optional[List[str]] = None, load_id: str = None, track_rows_uploaded: bool = None, path_to_output_for_dryrun: str = None, modify_lambda: Callable = None, chunksize: int = None)[source]¶ The Load object starts and tracks a multi-file load to a single lake table on Comotion Dash
Initialising this class starts a Load on Comotion Dash and stores the resulting load_id.
The load then needs to be committed with valid checksums to be pushed to the lake.
If you wish to work with an existing load, then supply only the load_id parameter
Example of upload with a Load instance:
load = Load(config = DashConfig(Auth('orgname')), table_name = 'v1_inforce_policies', load_as_service_client_id = '0') # Create the load print(load.load_id) # It can be useful to track these to fix for file, file_key in zip(file_path_list, file_keys): # Upload files with Load object load.upload_file( data = file, file_key = file_key ) load.commit( # Commit the load - this validates the files and pushes to the table_name specified check_sum = { # Check that data uploaded matches what you expected before pushing to the lake. *Also see track_rows_uploaded option 'sum(face_amount)': 20000, 'count(distinct policy_number)': 1000 } ) print(load.get_load_info()) # Run this at any time to get the latest information on the load
-
commit(check_sum: Optional[Dict[str, Union[int, float, str]]] = None)[source]¶ Kicks off the commit of the load. A checksum must be provided which is checked on the server side to ensure that the data provided has integrity. This is automatically created if you specify track_rows_uploaded = True when creating the load.
- Parameters
check_sum (Dict[str, Union[int, float, str]] (optional)) –
Checksum data for the files to be committed. Checksums must be in the form of a dictionary, with presto / trino expressions as the key, and the expected result as the value.
A check sum is not required if track_rows_uploaded was set to true for the load. This essentially builds the checksum {‘count(*)’: nrows_uploads} and adds it as an extrac checksum.
Example:
{ "count(*)" : 53, "sum(my_value)": 123.3 }
-
generate_presigned_url_for_file_upload(file_key: str = None) → comodash_api_client_lowlevel.models.file_upload_response.FileUploadResponse[source]¶ Generates presigned URLs and STS credentials for a new file upload.
- Parameters
file_key (str, optional) – Optional custom key for the file. This will ensure idempotence. If multiple files are uploaded to the same load with the same file_key, only the last one will be loaded. Must be lowercase and can include underscores.
- Returns
Model containing all the relevant credentials to be able to upload a file to S3 as part of the load. This includes the following attributes:
- presigned_urlstr
Presigned URL data for S3 file upload. The file can be posted to this endpoint using any AWS S3 compatible toolset. Temporary credentials are included in the URL, so no other credentials are required.
- sts_credentialsdict
Alternatively to the presigned_url, these Temporary AWS STS credentials can be used to upload the file to the location specified by path and bucket. This is required for various advanced toolsets, including AWS Wrangler.
- pathstr
Path of the file in the S3 bucket. See the description of sts_credentials.
- bucketstr
Name of the S3 bucket. See the description of sts_credentials.
- Return type
FileUploadResponse
-
get_load_info() → comodash_api_client_lowlevel.models.load.Load[source]¶ Gets the state of the load
- Returns
Model containing all the load info, with the following attributes
- load_status
Status of the load, one of OPEN, PROCESSING, FAIL or SUCCESS
- error_type
Type of error if the load status is FAIL.
- error_messages
Detailed error messages if the load status is FAIL.
- Return type
LoadInfo
-
upload_dash_query(data: comotion.dash.Query, file_key: str = None, max_workers: int = None, **pd_read_kwargs)[source]¶ Uploads the result of a dash.Query object to the specified lake table, or a local file if path_to_output_for_dryrun was specified. The result is read into a pandas.DataFrame and uploaded with the Load.upload_df() function. Note an index is added to the end of the file key to uniquely identify chunks uploaded.
- Parameters
data (Query) – The Query whose result is to be uploaded.
file_key (str, optional) – A unique key for the file being uploaded. If not provided, a key will be generated.
max_workers (int, optional) – The maximum number of threads to use for concurrent uploads (passed to concurrent.futures.ThreadPoolExecutor)
**pd_read_kwargs – Additional keyword arguments to pass to the pandas function. Note that filepath_or_buffer, chunksize and dtype are passed by default and so duplicating those here could cause issues.
- Returns
A list of responses from the load upload API call for each chunk.
- Return type
List[Any]
- Raises
ValueError – If an error occurs during the upload of any chunk or during the Query.
-
upload_df(data: pandas.core.frame.DataFrame, file_key: str = None)[source]¶ Uploads a pandas.DataFrame as a parquet file to an S3 bucket using a presigned URL. If path_to_output_for_dryrun is specified for the load, write the file to a local directory instead. Before uploading, modify_lambda is applied. Then, column names are forced to be lowercase and spaces are replaced with underscores. This is to prevent potential schema issues in the Dash lake.
- datapandas.DataFrame
The DataFrame to be uploaded. This should be a pandas.DataFrame object.
- file_keystr, optional
Optional custom key for the file to be. If not provided, a random file key is created. See Load.create_file_key(). This will ensure idempotence. If multiple files are uploaded to the same load with the same file_key, only the last one will be pushed to the lake. Must be lowercase and can include underscores.
None
import pandas as pd load = Load(dashconfig) df = pd.read_parquet('path/to/file.parquet') load.upload_df(data = df, file_key='my_file_id')
-
upload_file(data, file_key: str = None, use_file_name_as_key: bool = False, max_workers: int = None, **pd_read_kwargs)[source]¶ Uploads a file to the lake table specified in the load, or a local file if path_to_output_for_dryrun was specified. The file provided is read into a pandas.DataFrame using pd_read_kwargs with an appropriate pandas function. Note an index is added to the end of the file key to uniquely identify chunks uploaded.
- Parameters
data (Any) – The file to be uploaded. This should be readable by pandas.read_csv, pandas.read_parquet, pandas.read_json or pandas.read_excel.
file_key (str, optional) – A unique key for the file being uploaded. If not provided, a key will be generated.
use_file_name_as_key (bool, optional) – If True, the file name will be used as the file key. If False, a random key will be generated. Will throw an error when this is True and a file key is provided.
max_workers (int, optional) – The maximum number of threads to use for concurrent uploads (passed to concurrent.futures.ThreadPoolExecutor)
**pd_read_kwargs – Additional keyword arguments to pass to the pandas read function (one of [pd.read_csv, pd.read_parquet, pd.read_json, pd.read_excel]). You should not pass the variable pointing to the file here (e.g. filepath_or_buffer in pandas.read_csv), as this is passed in the data parameter. Chunksize and nrows should also not be provided as extra parameters. If you do not provide dtype, dtype will be determined automatically from the first chunk of data.
- Returns
A list of responses from the load upload API call for each chunk.
- Return type
List[Any]
- Raises
ValueError – If the file type cannot be determined or if an error occurs during the upload of any chunk.
-
-
class
comotion.dash.Migration(config: comotion.dash.DashConfig)[source]¶ The Migration object starts and tracks a migration from lake v1 to lake v2. It can only be run once, after which the old lake will be disabled.
-
start(migration_type: str = 'FLASH_SCHEMA', clear_out_new_lake: bool = False)[source]¶ Starts a migration.
Migrations can take a number of hours to complete. So get a cup of coffee.
Initialising this class starts the migration on Comotion Dash. If a migration is already in progress, initialisation will monitor the active load.
There are two types of migraiton, set by migration_type: - FULL_MIGRATION: runs a full migration by copying all the data across, updating the insights tables and deactivating data loads to the old lake. - FLASH_SCHEMA: copies the schema and one line of data per table to the new lake. this is useful to dev and test ETL scripts
There is an option to clear out the new lake on migration, set by the boolean parameters clear_out_new_lake. This is useful when testing has taken place, and data needs to be cleared. If clear_out_new_lake is set to False, the migration will fail if there is data in the new lake.
- Parameters
config (DashConfig) – Object of type DashConfig including configuration details
migration_type (str) – whether to run a full migration (“FULL_MIGRATION”) or only copy the schema of the lake across to the new lake (“FLASH_SCHEMA”)
clear_out_new_lake (bool) – whether to clear out the new lake on migration.
-
-
class
comotion.dash.Query(config: comotion.dash.DashConfig, query_text: str = None, query_id: str = None)[source]¶ The query object starts and tracks a query on Comotion Dash.
Initialising this class runs a query on Comotion Dash and stores the resulting query id in query_id
-
COMPLETED_STATES= ['SUCCEEDED', 'CANCELLED', 'FAILED']¶
-
SUCCEEDED_STATE= 'SUCCEEDED'¶
-
download_csv(output_file_path, fail_if_exists=False)[source]¶ Download csv of results and check that the total file size is correct
- Parameters
output_file_path (File path) – Path of the file to output to
fail_if_exists (bool, optional) – If true, then will fail if the target file name already/ Defaults to false.
- Raises
IncompleteRead – If only part of the file is downloaded, this is raised
-
get_csv_for_streaming() → urllib3.response.HTTPResponse[source]¶ Returns a
urllib3.response.HTTPResponseobject that can be used for streaming This allows use of the downloaded file without having to save it to local storage.Be sure to use
.release_conn()when completed to ensure that the connection is releasedThis can be achieved using the with notation e.g.:
with query.get_csv_for3_streaming().stream() as stream: for chunk in stream: # do somthing with chunk # chunk is a byte array ``
-
get_query_info() → comodash_api_client_lowlevel.models.query.Query[source]¶ Gets the state of the query.
- Returns
Model containing all query info, with the following attributes
- query
query sql
- query_id
query_id of query
- status
- completion_date_time
GMT Completion Time
- state
Current state of query. One of QUEUED,RUNNING,SUCCEEDED,FAILED,CANCELLED
- stateChangeReason
info about reason for state change (generally failure)
- submission_date_time`
GMT submission time
- Return type
QueryInfo
-
is_complete() → bool[source]¶ Indicates whether the query is in a final state. This means it has either succeeded, failed or been cancelled.
- Returns
Whether query complete
- Return type
bool
-
-
comotion.dash.create_gzipped_csv_stream_from_df(df: pandas.core.frame.DataFrame) → _io.BytesIO[source]¶ Returns a gzipped, utf-8 csv file bytestream from a pandas dataframe
Useful to help upload dataframes to dash
It does not break file up, so be sure to apply a maximise chunksize to the dataframe before applying - otherwise dash max file limits will cause an error
- Parameters
df (pd.DataFrame) – Dateframe to be turned into bytestream
- Returns
The Bytestream
- Return type
io.BytesIO
-
comotion.dash.read_and_upload_file_to_dash(file: Union[str, _io.FileIO], dash_table: str, dash_orgname: str, file_key: str = None, dash_api_key: str = None, check_sum: Optional[Dict[str, Union[int, float, str]]] = None, encoding: str = 'utf-8', chunksize: int = 30000, modify_lambda: Callable = None, path_to_output_for_dryrun: str = None, service_client_id: str = '0', partitions: Optional[List[str]] = None, load_type: str = 'APPEND_ONLY', data_model_version: str = None, entity_type: str = 'user', application_client_id: str = None, application_client_secret: str = None) → Union[List[Any], comotion.dash.DashBulkUploader][source]¶ Warning
This function will be deprecated on 1 December 2025. Please Migrate and move to the Load/DashBulkUploader class before then.
Reads a file and uploads it to Dash.
This function will: - Read a CSV file - Break it up into multiple CSVs, each with a maximum number of lines defined by chunksize - Upload them to Dash
- Parameters
file (Union[str, io.FileIO]) – Either a path to the file to be uploaded, or a FileIO stream representing the file to be uploaded. Should be an unencrypted, uncompressed CSV file.
dash_table (str) – Name of the Dash table to upload the file to.
dash_orgname (str) – Orgname of your Dash instance.
file_key (str, optional) – A unique key for the file being uploaded. If not provided, a key will be generated.
dash_api_key (str, optional) – Valid API key for Dash API. Required for v1 data model uploads.
check_sum (Optional[Dict[str, Union[int, float, str]]], optional) – Checksum data for the files to be committed. The checksum should be a dictionary with Presto/Trino expressions as keys and their expected results as values.
encoding (str, default 'utf-8') – The encoding of the source file.
chunksize (int, default 30000) – The maximum number of lines to be included in each file. Note that this should be low enough that the zipped file is less than Dash’s maximum gzipped file size.
modify_lambda (Callable, optional) – A callable that receives the pandas DataFrame read from the CSV. Provides the opportunity to modify the DataFrame, such as adding a timestamp column.
path_to_output_for_dryrun (str, optional) – If specified, no upload will be made to Dash, but files will be saved to the location specified. This is useful for testing. Multiple files will be created (1 per chunk)
service_client_id (str, optional) – If specified, specifies the service client for the upload. See the Dash documentation for an explanation of the service client. https://docs.comotion.us/Comotion%20Dash/Analysts/How%20To/Prepare%20Your%20Data%20Model/Y%20Service%20Client%20and%20Row%20Level%20Security.html#service-client-and-row-level-security
partitions (Optional[List[str]], optional) – List of partitions for the load.
load_type (str, default 'APPEND_ONLY') – The type of load operation. Default is ‘APPEND_ONLY’.
data_model_version (str, optional) – The data model version to use for the upload. If not specified, the function will determine the version. If the migration status for the org is ‘Completed’, v2 is the model version. Otherwise, v1 is the model version. data_model_version only needs be specified in exceptional circumstances where there are issues determining the migration status.
entity_type (str, default Auth.USER) – The entity type for authentication. Use Auth.USER if uploading as a user. Use Auth.APPLICATION if uploading with application credentials.
application_client_id (str, optional) – The application client ID for authentication.
application_client_secret (str, optional) – The application client secret for authentication.
- Returns
For v1 data model uploads, returns a list of HTTP responses. For v2 data model uploads, returns the DashBulkUploader instance.
- Return type
Union[List[Any], DashBulkUploader]
- Raises
ValueError – If the API key is not specified for v1 lake upload or if the file type cannot be determined.
-
comotion.dash.upload_csv_to_dash(dash_orgname: str, dash_api_key: str, dash_table: str, csv_gz_stream: _io.FileIO, service_client_id: str = '0') → requests.models.Response[source]¶ Uploads csv gzipped stream to Dash
Expects a csv gzipped stream to upload to dash.
- Parameters
dash_orgname (str) – Dash organisation name for dash instance
dash_api_key (str) – Valid API key for the organisation instance
dash_table (str) – Table name to upload to
csv_gz_stream (io.FileIO) – Description
- Returns
response from dash api
- Return type
requests.Response
- Raises
HTTPError – If one is raised by the call
-
comotion.dash.upload_from_oracle(sql_host: str, sql_port: int, sql_service_name: str, sql_username: str, sql_password: str, sql_query: str, dash_table: str, dash_orgname: str, dash_api_key: str, dtypes: dict = None, include_snapshot: bool = True, export_csvs: bool = True, chunksize: int = 50000, output_path: str = None, sep: str = '\t', max_tries: int = 5)[source]¶ Uploads data from a Oracle SQL database object to dash.
This function will: - get the total number of rows chunks need for the sql query - get chunks of data from the SQL database - upload them to dash - append them to csv output (if specified) - save error chunks as csv (if any)
- Parameters
sql_host (str) – SQL hot e.g. ‘192.168.0.0.1’
sql_port (str) – SQL port number e.g 9005
sql_service_name (str) – SQL service name e.g. ‘myservice’
sql_username (str) – SQL username,
sql_password (str) – SQL password,
sql_query (str) – SQL query,
dash_table (str) – name of Dash table to upload the data to
dash_orgname (str) – orgname of your Dash instance
dash_api_key (str) – valid api key for Dash API
export_csvs – (optional) If True, data successfully uploaded is exported as csv Defaults to True i.e. output is included
dtypes (dict) – (optional) A dictionary that contains the column name and data type to convert to. Defaults to None i.e. load dataframe columns as they are.
chunksize (int) – (optional) the maximum number of lines to be included in each file. Note that this should be low enough that the zipped file is less than Dash maximum gzipped file size. Defaults to 50000.
sep (str) – (optional) Field delimiter for ComoDash table to upload the dataframe to. Defaults to /t.
output_path (str) – (optional) if specified, no output csv are saved in that location. If not, output is place in the same location as the script Defaults to None
include_snapshot (bool) – (optional) If True, an additional column ‘snapshot_timestamp’ will be added to the DataFrame. This column will contain the time that data is loaded in “%Y-%m-%d %H:%M:%S.%f” format in order to help with database management Defaults to True i.e. snapshot_timestamp is included
max_tries (int) – (optional) Maximum number of times to retry if there is an HTTP error Defaults to 5
- Returns
DataFrame with errors
- Return type
pd.DataFrame
-
comotion.dash.v1_upload_csv(file: Union[str, _io.FileIO], dash_table: str, dash_orgname: str, dash_api_key: str, encoding: str = 'utf-8', chunksize: int = 30000, modify_lambda: Callable = None, path_to_output_for_dryrun: str = None, service_client_id: str = '0')[source]¶ Warning
This function is deprecated. Use read_and_upload_file_to_dash instead.
Reads a file and uploads it to Dash.
This function will: - Read a csv file - Break it up into multiple csv’s - each with a maximum number of lines defined by chunksize - upload them to dash
- Parameters
file (Union[str, io.FileIO]) – Either a path to the file to be uploaded, or a FileIO stream representing the file to be uploaded Should be an unencrypted, uncompressed CSV file
dash_table (str) – name of Dash table to upload the file to
dash_orgname (str) – orgname of your Dash instance
dash_api_key (str) – valid api key for Dash API
encoding (str) – the encoding of the source file. defaults to utf-8.
chunksize (int) – (optional) the maximum number of lines to be included in each file. Note that this should be low enough that the zipped file is less than Dash maximum gzipped file size. Defaults to 30000.
modify_lambda – (optional) a callable that recieves the pandas dataframe read from the csv. Gives the opportunity to modify - such as adding a timestamp column. Is not required.
path_to_output_for_dryrun (str) – (optional) if specified, no upload will be made to dash, but files will be saved to the location specified. This is useful for testing. multiple files will be created: [table_name].[i].csv.gz where i represents multiple file parts
service_client_id (str) – (optional) if specified, specifies the service client for the upload. See the dash documentation for an explanation of service client. https://docs.comotion.us/Comotion%20Dash/Analysts/How%20To/Prepare%20Your%20Data%20Model/Y%20Service%20Client%20and%20Row%20Level%20Security.html#service-client-and-row-level-security
- Returns
List of http responses
- Return type
List