Getting Started with Comotion Python SDK

This documentation helps you get set up on and use the python Comotion SDK. This documentation includes this getting started page, and full documentation on the sdk functions:

This is an open source project, and can be contributed to by the community.

The full Comotion documentation portal can be found here.

Installation

Install the comotion-sdk in your python environment using pip:

pip install comotion-sdk

If you have already installed the SDK, ensure it is up to date by running:

pip install --upgrade comotion-sdk

Authentication

Logging In

The Comotion Dash API is built on v2 of the Comotion API - which uses a new way to authenticate. You do not need an API key, but can log in with your normal user name and password.

In order to do this, after you have installed the SDK, you need to authenticate from the command line. Type in the following from the command line

> comotion authenticate

You will be prompted for your orgname which is your orgnisation’s unique name, and then a browser will open for you to login.

Once this process is complete, the relevant keys will automatically be saved in your computers’s credentials manager.

Org Name Set-up

To prevent asking for orgname every time, you can save your orgname as an environment variable COMOTION_ORGNAME

> export COMOTION_ORGNAME=orgname
> comotion authenticate

or, include it directly in the comment line:

> comotion -o orgname authenticate

Uploading Data to Dash: Data Model v2

The following upload toolsets are available once you are on data model v2. Remember to authenticate if you haven’t in a while: Logging In.

For more information on migrating, see Migrating to Data Model v2.

Hint

It is helpful to track the load_id of the loads you create.

This is a unique identifier for the load, and can be used to re-create the load if needed.

Comotion can also provide support easily if the load_id is provided.

DashBulkUploader

The DashBulkUploader class is the recommended way to upload data to Dash. This uses the Load class, but provides some default functionality and the uploads attribute to manage multiple loads.

If more control is needed over the upload process, the Load class can be used instead.

The process can be summarized as follows:

  1. Create a DashBulkUploader object

  2. Add loads to the uploader

  3. Add data sources to each load

  4. Execute the uploads

  5. Check the status of the loads

Below is a simple example of how to use the DashBulkUploader class.

from comotion.dash import DashBulkUploader, DashConfig
from comotion.auth import Auth

# 1. Create the uploader object
config = DashConfig(Auth(orgname = 'my_org_name'))
uploader = DashBulkUploader(config = config)

# 2. Add a Load -> essentially creates a Load object and adds it to the uploader
lake_tables_and_sources = [
    ('v1_policies', 'data/inforce_policies'),
    ('v1_claims', 'data/claims'),
    ('v1_customers', 'data/customers')
]

for my_lake_table, source_data in lake_tables_and_sources:
   uploader.add_load(
      table_name=my_lake_table,
      load_as_service_client_id='0',
      track_rows_uploaded=True  # This automatically creates a check_sum {'count(*)': x} where x is the number of rows in the source data.
      # See cheat sheet for more options available for the load.
   )

   # Add data source(s) to the Load. At least one data source is required, but multiple can be added to the same load.
   uploader.add_data_to_load(
      table_name=my_lake_table,
      data=source_data  # Can be a path to a csv, parquet or directory, a pandas dataframe or a dash.Query object
      # See cheat sheet for more options available for the data source.
   )

# 4. Execute the uploads - this commits the loads.
print(uploader.uploads)
uploader.execute_all_uploads() # You can also execute specific uploads with execute_multiple_uploads() and execute_upload()

# 5. Check the status of the loads
print(uploader.get_load_info())

See the v2 Upload Cheat sheet for different configuration options available, and if they should be specified for the load or the data source.

If your data source has specific read requirements, see Pandas Usage for more information on how to use pandas arguments to handle your use case.

Hint

If a path to a directory is specified as a data source, all valid files in the directory will be uploaded to the lake table specified.

This allows you to chunk files and store them in the same folder for more efficient extract and upload.

Note this is not an option for the Load class.

Obtaining the file_key to remove data from a load

The file_key parameter is used to avoid duplicating a data source within a load. This is automatically generated if not provided.

You can also use the uploader.remove_load() or uploader.remove_data_from_load() functions to remove loads or data sources from the uploader respectively.

The remove_data_from_load function requires a file key to be specified, which can be retrieved from the uploader.uploads object.

Load

The Load class is the primary class for uploading data to Dash. The process of using this class can be outlined as follows:

  1. Create a Load object

  2. Upload data sources to the load

  3. Commit the load with a valid check sum

  4. Check the status of the load

from comotion.dash import Load, DashConfig, Query
from comotion.auth import Auth
import pandas as pd

# 1. Create the load object and data source variables
dashconfig = DashConfig(Auth(orgname = 'my_org_name'))

load = Load(config = dashconfig,
            load_type = 'APPEND_ONLY',
            table_name = 'v1_inforce_policies',
            load_as_service_client_id = '0',
            track_rows_uploaded = True
            # See cheat sheet for more options available for the load.
            )
print("Load ID: " + load.load_id) # It is helpful to track the load_id

# 2. Upload data sources to the load. At least one data source is required, but multiple can be added to the same load.

# Several examples shown for demonstration, but typically only one of these functions will be required.
# 2.1: Upload a pandas dataframe. This is useful if your source is not part of the standard data sources supported.
df_for_upload = pd.read_sql(query, connection)
load.upload_df(
   data = df_for_upload
)
# 2.2: Upload a file. This can be a csv, parquet, json or excel file or file .io stream.
path_to_upload_file = 'data/inforce_policies.csv'
load.upload_file(
   data = path_to_upload_file
)
# 2.3: Uploaed a dash.Query object. This is useful, for example, for moving data from one lake table to another.
dash_query = Query(query_text = 'select * from orgname_lake_v2.v1_staging_policies', config = dashconfig)
load.upload_dash_query(# Uploads the result of a dash.Query object.
   data = dash_query
)

# 3. Commit the load with a valid check sum.
load.commit(
   check_sum = {
      'sum(salary)': 12345
   }
)

# 4. Check the status of the load
print(load.wait_to_complete())
load_info = load.get_load_info()
print(load_info)

See the v2 Upload Cheat sheet for different configuration options available.

You can also re-create an existing load if you have the correct load_id.

load = Load(config = dashconfig,
            load_id = 'load_id'
            # The other arguments should not be specified when re-creating a load.
            )

Uploading Data to Dash: Data Model v1

Note

Comotion has released a new version of our data model in 2025. We refer to the original version as Data Model v1, and the new version as Data Model v2.

Uploading a file to Dash

The read_and_upload_file_to_dash reads a csv file, breaks it up, gzips the files and pushes them to the Dash API.

# Break up and upload file to Dash

from comotion import dash
from getpass import getpass

# set relevant parameters
dash_orgname = 'my_org_name'
dash_api_key = ''
dash_api_key = getpass(
    prompt='Enter your ' + dash_orgname + '.comodash.io api key:'
) # this prompts the user to enter the api key

dash.read_and_upload_file_to_dash(
    './path/to/csv/file.csv',
    dash_table='my_table_name',
    dash_orgname=dash_orgname,
    dash_api_key=dash_api_key
)

Custom Upload Usage

Testing and debugging an upload script

In order to check that your script is working, you can run a dry run. This saves the files locally rather than uploading them to dash - so that you can check the result before uploading.

# First test the upload using the dry_run feature

from comotion import dash
from getpass import getpass

# set relevant parameters
dash_orgname = 'my_org_name'
dash_api_key = ''
dash_api_key = getpass(
    prompt='Enter your ' + dash_orgname + '.comodash.io api key:'
) # this prompts the user to enter the api key

dash.read_and_upload_file_to_dash(
    './path/to/csv/file.csv',
    dash_table='my_table_name',
    dash_orgname=dash_orgname,
    dash_api_key=dash_api_key,
    path_to_output_for_dryrun='./outputpath/'
)

Instead of uploading, this will output the files that would have been uploaded to ./outputpath/. If the file to be uplaoded is large, it will break it up and all files would be placed in the output path.

Pandas Usage

Using this sdk in conjunction with pandas provides a powerful toolset to integrate with any source.

Hint

Uploads are always converted to a pandas.DataFrame when uploading with the SDK when using the Load and DashBulkUploader classes. This allows you to provide valid pandas arguments in the Load.upload_file(), Load.upload_dash_query() and DashBulkUploader.add_data_to_load() functions.

For production uploads, the dtype argument is highly recommended: pandas dtypes.

The following 3 examples are equivalent ways to upload data into Dash. Notice how the exact same arguments which can be passed to a pandas read function can be passed to the SDK classes:

# 1: Using pandas arguments in the Load class
load.upload_file(
   data = 'my_data.csv',
   dtype = {'column_1': 'int', 'column_2': 'object'}
   sep = ';',
   encoding = 'utf-16'
   # Could add any more valid pandas arguments here
)

# 2: Using pandas arguments in the DashBulkUploader class
uploader.add_data_to_load(
   table_name = my_lake_table,
   data = 'my_data.csv',
   dtype = {'column_1': 'int', 'column_2': 'object'},
   sep = ';',
   encoding = 'utf-16'
   # Could add any more valid pandas arguments here
)

# 3: Read with pandas and upload df with the SDK
df = pd.read_csv('my_data.csv',
                 dtype = {'column_1': 'int', 'column_2': 'object'},
                 sep = ';',
                 encoding = 'utf-16')

# 3.1: Then upload a df with the DashBulkUploader
uploader.add_data_to_load(
   table_name = my_lake_table,
   data = df
)

# OR 3.2: Upload a df with the Load class
load.upload_df(
   data = df
)

But there are many more pandas arguments available to read your source correctly. Refer to the applicable pandas read function documentation in the table below to see the available arguments for your source data.

(Almost) All of the arguments available in the applicable read function can be used with the SDK.

Pandas Read Options for Source Types

Data Source Type

Pandas Read Function Documentation

CSV

pd.read_csv

Excel

pd.read_excel

JSON

pd.read_json

Parquet

pd.read_parquet

Other

See example 3 above. First read the data into a pandas dataframe using the appropriate pandas read function, and then use the Load.upload_df() or DashBulkUploader.add_data_to_load() function to upload it.

Note

The following pandas arguments should not be provided as they are used by the default functionality of the SDK: ['filepath_or_buffer', 'chunksize', 'nrows', 'path', 'path_or_buf', 'io'].

Hint

Often you will want to process your source data before uploading. For example, you may want to add metadata to the data, or hash PII before uploading.

Simple operations on the source data can be applied for the upload using the modify_lambda parameter. It accepts a python function that recieves a pandas.dataframe.

Here is simple example of adding a timestamp as a column called snapshot_timestamp to the csv. The function provided can apply several manipulations to the data if required:

# Break up and upload file to Dash with Extra Column

from comotion import dash
from getpass import getpass
from datetime import datetime
import pandas as pd

# define the function used to modify the file

myTimeStamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")

def addTimeStamp(df: pd.DataFrame): # Do not need to specify the type of df, but shown for demonstration
   df['snapshot_timestamp'] = myTimeStamp
   # Can provide more manipulations in this function if needed

# set relevant parameters

dash_orgname = 'my_org_name'
dash_api_key = ''
dash_api_key = getpass(
   prompt='Enter your ' + dash_orgname + '.comodash.io api key:'
) # this prompts the user to enter the api key

dash.read_and_upload_file_to_dash(
   './path/to/csv/file.csv',
   dash_table='my_table_name',
   dash_orgname=dash_orgname,
   dash_api_key=dash_api_key,
   modify_lambda=addTimeStamp
)

Migrating to Data Model v2

You can migrate to data model v2 with the SDK. First you will need to run a flash schema migration.

from comotion.dash import DashConfig, Migration
from comotion.auth import Auth

config = DashConfig(Auth("myorgname"))

migration = Migration(config = config)
migration.start(migration_type = 'FLASH_SCHEMA') # This is the default value so it does not need to be specified

This copies the schema of all your lake tables from the v1 lake to the v2 lake.

Now you will be able to test your ETL on the new lake, using the read_and_upload_file_to_dash function, DashBulkUploader class or Load class.

Once you are happy that your ETL will work with the new lake, you can run a full migration.

migration.start(migration_type = 'FULL_MIGRATION',
                clear_out_new_lake = True)

This clears out the v2 lake and runs the full migration of data. If clear_out_new_lake is false and there is data in the v2 lake, the full migration will fail.

Good practice when migrating would be:

  1. Run the flash schema migration

  2. Test your ETL on the new lake until satisfied.

  3. Stop ETL processes to the old lake.

  4. Run the full migration (the length of time for this depends on the amount of data in your lake, but it should not take more than a couple of hours).

  5. Deploy your new ETL processes.

Once the full migration is run, you can no longer upload to the v1 lake.

Check on the migration status at any time.

print(migration.status())

read_and_upload_file_to_dash

Warning

Comotion will deprecate the read_and_upload_file_to_dash function on 1 December 2025, along with Data Model v1.

Once you are operating in the new lake, we recommend you move to using the new Load and DashBulkUploader class for uploads. Reach out to us for assistance at dash@comotion.co.za .

The read_and_upload_file_to_dash function can also be used to upload to Data Model v2. See the above section on using this function:

The key considerations when uploading to the v2 data model with this function are as follows:

  • The file parameter can now accept a path to a csv, excel, json, parquet or directory, a pandas.DataFrame or a dash.Query object.

  • The dash_api_key is no longer required. Instead you will have to run the following in your command line to authenticate: comotion authenticate

  • You will need to specify data_model_version = 'v2' if you have not run a full migration (more on migrating below)

  • The function will return a DashBulkUploader object.

Example of ETL change for Data Model v2

Python for v1 Upload

from comotion.dash import read_and_upload_file_to_dash
from datetime import datetime

# In this example, we use the modify_lambda argument to add a column to the end of the lake table to mark the timestamp when the upload was started
def add_upload_timestamp(df):
   upload_timestamp = datetime.now()
   df['upload_timestamp'] = upload_timestamp.strftime('%Y-%m-%d %H:%M:%S')

upload_response = read_and_upload_file_to_dash(
   file = 'path/to/file.csv',
   dash_table = 'my_lake_table_name',
   dash_orgname = 'my_dash_orgname',
   dash_api_key = 'my_api_key',
   modify_lambda = add_upload_timestamp # Function defined above
)

Python for v2 Upload

from comotion.dash import read_and_upload_file_to_dash
from datetime import datetime

# In this example, we use the modify_lambda argument to add a column to the end of the lake table to mark the timestamp when the upload was started
def add_upload_timestamp(df):
   upload_timestamp = datetime.now()
   df['upload_timestamp'] = upload_timestamp.strftime('%Y-%m-%d %H:%M:%S')
   df['data_import_batch'] = upload_timestamp.strftime('%Y-%m-%d') # First change: Add the data import batch column.

upload_response = read_and_upload_file_to_dash(
   file = 'path/to/file.csv',
   dash_table = 'my_lake_table_name',
   dash_orgname = 'my_dash_orgname',
   # API key is no longer required for the upload
   modify_lambda = add_upload_timestamp, # Function defined above
   data_model_version = 'v2' # Add this parameter to force an attempted upload to the new lake
)

Running Queries and Extracting Data

You can use the sdk to run queries on Dash, as well as download the results in csv format.

Running a query

You can then use the query object in the comotion.dash module to create a query and download the results. Here is an example code snippet. You do not need to authenticate in your code - the Auth class takes care of that.

from comotion.dash import DashConfig
from comotion.auth import Auth
from comotion.dash import Query

config = DashConfig(Auth("myorgname"))

# this step creates the query object and kicks off the query
query = Query(query_text="select 1", config=config)

# this step blocks until the query is complete and provides the query metadata
final_query_info = query.wait_to_complete()


with open("myfile.csv", "wb") as file:
   with query.get_csv_for_streaming() as response:
      for chunk in response.stream(524288):
         file.write(chunk)

Authentication Using Client Secret

We have introduced an additional method of authenticating to the Comotion API using a client secret. This allows applications to consume the API without a user.

User vs Application Authentication

If your users are logging into an application which needs to consume data from Dash, a better approach is to integrate that app with our single signon functionality.

That application can then use an access token generated for that user to consume the dash API. This means better security as Dash will be aware of the user, and obey things like Service Client data siloing.

This section deals with the scenario where the application itself is acting as the user, which is useful for various data extraction and loading scenarios.

In order to use this functionality, the dash team will need to create a new client, and supply a client_id and secret key that will be used to authenticate. Get in touch with dash@comotion.co.za.

Once that is done, your application can authenticate using the secret key using the application_secret_id and application_client_secret parameters, as shown in the example below.

from comotion.dash import DashConfig
from comotion.auth import Auth
from comotion.dash import Query

## Authenticate as an application
auth = Auth(
     entity_type=Auth.APPLICATION,
     application_client_id="test",
     application_client_secret="2shjkdfbjsdhfg2893847",
     #best to pass to your application from an apprioriate secrets manager or environment variable
     orgname="myorgname"
)
config = DashConfig(auth)

# this step creates the query object and kicks off the query
query = Query(query_text="select 1", config=config)

# this step blocks until the query is complete and provides the query metadata
final_query_info = query.wait_to_complete()

if final_query_info.status.state == 'FAILED':
   # deal with failure scenario
   raise Exception(f'Error while retrieving Dash data: Query failed; {final_query_info.status.state_change_reason}')

elif final_query_info.status.state == 'CANCELLED':
   # deal with cancelled scenario
   raise Exception(f'Error while retrieving Dash data: Query cancelled; {final_query_info.status.state_change_reason}')

else:
   # deal with success scenario
   with open("myfile.csv", "wb") as file:
      with query.get_csv_for_streaming() as response:
         for chunk in response.stream(524288):
            file.write(chunk)

To process the output data further instead of writing to a csv file, load the data into a dataframe:

import io
response = query.get_csv_for_streaming()
data = io.StringIO(response.data.decode('utf-8'))
df = pd.read_csv(data)
# further processing...

Data Zones

If an upload or query needs to be run in a specific data zone, the zone argument can be specified when creating the DashConfig object.

from comotion.dash import DashConfig, Query, Load
from comotion.auth import Auth

zone_1_config = DashConfig(Auth("myorgname"), zone = 'zone_1')
zone_2_config = DashConfig(Auth("myorgname"), zone = 'zone_2')

# This runs the query in zone_1
query = Query(query_text="select 1", config=zone_1_config)

# This creates a load in zone_2
load = Load(config = zone_2_config,
            load_type = 'APPEND_ONLY',
            table_name = 'v1_inforce_policies',
            load_as_service_client_id = '0',
            track_rows_uploaded = True
            )

v2 Upload Cheat sheet

Below is a cheat sheet for the important arguments available for the DashBulkUploader and Load classes.

Argument Cheat Sheet

Arg

Required

Description

DashBulkUploader: Use when calling

Load: Use when calling

Comments

config

Yes

DashConfig

DashBulkUploader

Load

table_name

Yes

Name of the lake table to upload to.

DashBulkUploader.add_load

Load

load_type

Yes for Load, No for DashBulkUploader

Type of load to create. Currently only ‘APPEND_ONLY’ is supported.

DashBulkUploader.add_load

Load

load_as_service_client_id

No if service_client_id field is created in data source, otherwise Yes.

Service client id to add to each row of data in the Load.

DashBulkUploader.add_load

Load

partitions

No

List of fields to partition on

DashBulkUploader.add_load

Load

load_id

No

Load id to use when re-creating a load.

DashBulkUploader.add_load

Load

track_rows_uploaded

No

Indicate if row count check sum should be created for the load automatically.

DashBulkUploader.add_load

Load

path_to_output_for_dryrun

No

Path to the directory to save the files to when running a test run, i.e. not uploading to Dash.

DashBulkUploader.add_load

Load

Testing and debugging an upload script

modify_lambda

No

Function to modify the data before uploading. This function should take a pandas dataframe as input and manipulate the dataframe in-place as needed.

DashBulkUploader.add_load

Load

Pandas Usage

chunksize

No

Size of the chunks to break the data into for uploading. This is a legacy argument, but smaller chunksize can be specified if required.

DashBulkUploader.add_load

Load

check_sum

Yes (or specify track_rows_uploaded = True)

Check sum to use for the load. This is a dictionary of SQL aggregations and expected values. The sql aggregation is run on unioned combined data sources, and if the result does not match the expected value, the load is rejected.

DashBulkUploader.add_load

Load.commit

data

Yes

Specification of data source to be uploaded. Exact type depends on the function used to upload the data and the source data format.

DashBulkUploader.add_data_to_load

Load.upload_df ; Load.upload_file ; Load.upload_dash_query

A directory can only be specified as a data source using the DashBulkUploader.add_data_to_load function.

file_key

No

Unique key to identify the data source. This is automatically generated if not provided.

DashBulkUploader.add_data_to_load

Load.upload_df ; Load.upload_file ; Load.upload_dash_query

This is used to avoid duplicating a data source within a load, as well as to remove data sources from a load in the DashBulkUploader class.

**pd_read_kwargs

No

Placeholder for additional arguments to pass to the pandas read function. E.g. dtype, sep, sheet_name, skiprows etc.

DashBulkUploader.add_data_to_load

Load.upload_file ; Load.upload_dash_query

Pandas Usage