import io
import os
import json
import requests
import csv
import time
from typing import Union, Callable, List, Optional, Dict, Any
from os.path import join, basename, isdir, isfile, splitext
from os import listdir
import logging
logger = logging.getLogger(__name__)
import pandas as pd
try:
import pyarrow as pa
import pyarrow.parquet as pq
except ImportError:
pa = None
pq = None
logger.warning("Optional dependency 'pyarrow' is not installed; Arrow/Parquet features are unavailable.")
try:
import boto3
except ImportError:
boto3 = None
logger.warning("Optional dependency 'boto3' is not installed; AWS features are unavailable.")
try:
import awswrangler as wr
except ImportError:
wr = None
logger.warning("Optional dependency 'awswrangler' is not installed; AWS Wrangler features are unavailable.")
import re
import uuid
try:
import cx_Oracle
except ImportError:
cx_Oracle = None
logger.warning("Optional dependency 'cx_Oracle' is not installed; Oracle-related features are unavailable.")
try:
import sqlalchemy
except ImportError:
sqlalchemy = None
logger.warning("Optional dependency 'sqlalchemy' is not installed; SQLAlchemy-related features are unavailable.")
try:
from tqdm import tqdm
except ImportError:
tqdm = None
logger.warning("Optional dependency 'tqdm' is not installed; progress bars are unavailable.")
from datetime import datetime, timedelta
from comotion import Auth
import comodash_api_client_lowlevel
from comodash_api_client_lowlevel import QueriesApi, LoadsApi, MigrationsApi
from comodash_api_client_lowlevel.models.query_text import QueryText
from urllib3.exceptions import IncompleteRead
from urllib3.response import HTTPResponse
from comodash_api_client_lowlevel import Load
from comodash_api_client_lowlevel.models.query import Query as QueryInfo
from comodash_api_client_lowlevel.models.load import Load as LoadInfo
from comodash_api_client_lowlevel.models.file_upload_request import FileUploadRequest
from comodash_api_client_lowlevel.models.file_upload_response import FileUploadResponse
from comodash_api_client_lowlevel.models.load_commit import LoadCommit
from comodash_api_client_lowlevel.models.load import Load
from comodash_api_client_lowlevel.models.query_id import QueryId
from comodash_api_client_lowlevel.rest import ApiException
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import string
from inspect import signature, Parameter
[docs]class DashConfig(comodash_api_client_lowlevel.Configuration):
"""
Object containing configuration information for Dash API
Attributes
----------
auth : comotion.Auth
comotion.Auth object holding information about authentication
zone: str, optional
The zone to use for the API. If not provided, defaults to None, i.e. the main zone.
"""
def __init__(self, auth: Auth, zone: str = None):
if not(isinstance(auth, Auth)):
raise TypeError("auth must be of type comotion.Auth")
self.auth = auth
self.orgname = auth.orgname
self.zone = zone
host_url = 'https://%s.api.comodash.io/v2' % (self.orgname) if not zone else 'https://%s.%s.api.comodash.io/v2' % (self.zone, self.orgname)
super().__init__(
host=host_url,
access_token=None
)
# comodash_api_client_lowlevel.Configuration.set_default(config)
def _check_and_refresh_token(self):
"""
checks whether the access token is still valid otherwise refreshes it
"""
import jwt
# If there's no token, get one
if not self.access_token:
self.access_token = self.auth.get_access_token()
return
# Decode the token payload without verification (unsafe for actual use, see below)
try:
payload = jwt.decode(self.access_token, options={"verify_signature": False})
# Get the current time and expiration time from the token
now = datetime.utcnow()
exp = datetime.fromtimestamp(payload['exp'])
# If the token has expired, refresh it
if now >= (exp - timedelta(seconds=30)):
self.access_token = self.auth.get_access_token()
except jwt.ExpiredSignatureError:
# If the token is expired, refresh it
self.access_token = self.auth.get_access_token()
except jwt.DecodeError as e:
# Handle cases for invalid token
self.access_token = self.auth.get_access_token()
except KeyError:
# Handle cases for tokens with unexpected payload
self.access_token = self.auth.get_access_token()
[docs] def auth_settings(self):
"""Gets Auth Settings dict for api client.
:return: The Auth Settings information dict.
"""
# This overrides the lowlevel configuration object to allow for automatic refresh of access token
self._check_and_refresh_token()
return super().auth_settings()
[docs]class Query():
"""
The query object starts and tracks a query on Comotion Dash.
Initialising this class runs a query on Comotion Dash and stores the
resulting query id in `query_id`
"""
COMPLETED_STATES = ['SUCCEEDED', 'CANCELLED', 'FAILED']
SUCCEEDED_STATE = 'SUCCEEDED'
def __init__(
self,
config: DashConfig,
query_text: str = None,
query_id: str = None
):
"""
Parameters
----------
query_text : str
sql of the query to run
config : DashConfig
Object of type DashConfig including configuration details
query_id : str, optional
Query id of existing query. If not provided, then a new query will be started on Dash
Raises
------
TypeError
If config is not of type DashConfig
ValueError
if one of query_id or query_text is not provided
"""
if not(isinstance(config, DashConfig)):
raise TypeError("config must be of type comotion.dash.DashConfig")
self.config = config
with comodash_api_client_lowlevel.ApiClient(self.config) as api_client:
self.query_api_instance = QueriesApi(api_client)
if query_id:
# query_info = self.query_api_instance.get_query(query_id)
self.query_id = query_id
# self.query_text = query_info.query
elif query_text:
self.query_text = query_text
query_text_model = QueryText(query=query_text)
try:
query_id_model = self.query_api_instance.run_query(query_text_model) # noqa
except comodash_api_client_lowlevel.exceptions.BadRequestException as exp:
raise ValueError(json.loads(exp.body)['message'])
self.query_id = query_id_model.query_id
else:
raise ValueError("One of query_id or query_text must be provided")
[docs] def refresh_api_instance(self):
zone = self.config.zone
auth_token = self.config.auth
orgname = auth_token.orgname
entity_type = auth_token.entity_type
if entity_type == Auth.APPLICATION:
application_client_id = auth_token.application_client_id
application_client_secret = auth_token.application_client_secret
else:
application_client_id = None
application_client_secret = None
self.config = DashConfig(
Auth(
orgname=orgname,
entity_type=entity_type,
application_client_id=application_client_id,
application_client_secret=application_client_secret
),
zone = zone
)
with comodash_api_client_lowlevel.ApiClient(self.config) as api_client:
# Create an instance of the API class with provided parameters
self.query_api_instance = QueriesApi(api_client)
[docs] def get_query_info(self) -> QueryInfo:
"""Gets the state of the query.
Returns
-------
QueryInfo
Model containing all query info, with the following attributes
`query`
query sql
`query_id`
query_id of query
`status`
`completion_date_time`
GMT Completion Time
`state`
Current state of query. One of QUEUED,RUNNING,SUCCEEDED,FAILED,CANCELLED
`stateChangeReason`
info about reason for state change (generally failure)
submission_date_time`
GMT submission time
"""
try:
self.refresh_api_instance()
return self.query_api_instance.get_query(self.query_id)
except comodash_api_client_lowlevel.exceptions.NotFoundException as exp:
raise ValueError("query_id cannot be found")
[docs] def state(self) -> str:
"""Gets the state of the query.
Returns
-------
str
One of QUEUED,RUNNING,SUCCEEDED,FAILED,CANCELLED
"""
return self.get_query_info().status.state
[docs] def is_complete(self) -> bool:
"""Indicates whether the query is in a final state.
This means it has either succeeded, failed or been cancelled.
Returns
-------
bool
Whether query complete
"""
return self.state() in Query.COMPLETED_STATES
[docs] def wait_to_complete(self) -> bool:
"""Blocks until query is in a complete state
Returns
-------
str
Final state, one of 'SUCCEEDED', 'CANCELLED', 'FAILED'
"""
while True:
query_info = self.get_query_info()
if query_info.status.state in Query.COMPLETED_STATES:
return query_info
time.sleep(5)
[docs] def query_id(self) -> str:
"""Returns query id for this query
"""
return self.query_id
[docs] def get_csv_for_streaming(self) -> HTTPResponse:
""" Returns a ``urllib3.response.HTTPResponse`` object that can be used for streaming
This allows use of the downloaded file without having to save
it to local storage.
Be sure to use ``.release_conn()`` when completed to ensure that the
connection is released
This can be achieved using the `with` notation e.g.::
with query.get_csv_for3_streaming().stream() as stream:
for chunk in stream:
# do somthing with chunk
# chunk is a byte array ``
"""
self.refresh_api_instance()
response = self.query_api_instance.download_csv_without_preload_content(
query_id=self.query_id)
response.autoclose = False
return response
[docs] def download_csv(self, output_file_path, fail_if_exists=False):
"""Download csv of results and check that the total file size is correct
Parameters
----------
output_file_path : File path
Path of the file to output to
fail_if_exists : bool, optional
If true, then will fail if the target file name already/
Defaults to false.
Raises
------
IncompleteRead
If only part of the file is downloaded, this is raised
"""
with self.get_csv_for_streaming() as response:
write_mode = "wb"
if fail_if_exists:
write_mode = "xb"
with io.open(output_file_path, write_mode) as f:
size = 0
content_length = (response.getheader('Content-Length'))
for chunk in response.stream(1048576):
size = size + len(chunk)
f.write(chunk)
if (response.tell() != int(content_length)):
raise IncompleteRead(
response.tell(),
int(content_length) - response.tell()
)
[docs] def stop(self):
""" Stop the query"""
self.refresh_api_instance()
return self.query_api_instance.stop_query(self.query_id)
[docs]class Load():
"""
The Load object starts and tracks a multi-file load to a single lake table on Comotion Dash
Initialising this class starts a Load on Comotion Dash and stores the
resulting `load_id`.
The load then needs to be committed with valid checksums to be pushed to the lake.
If you wish to work with an existing load, then supply only the `load_id` parameter
Example of upload with a Load instance:
.. code-block:: python
load = Load(config = DashConfig(Auth('orgname')),
table_name = 'v1_inforce_policies',
load_as_service_client_id = '0') # Create the load
print(load.load_id) # It can be useful to track these to fix
for file, file_key in zip(file_path_list, file_keys): # Upload files with Load object
load.upload_file(
data = file,
file_key = file_key
)
load.commit( # Commit the load - this validates the files and pushes to the table_name specified
check_sum = { # Check that data uploaded matches what you expected before pushing to the lake. *Also see track_rows_uploaded option
'sum(face_amount)': 20000,
'count(distinct policy_number)': 1000
}
)
print(load.get_load_info()) # Run this at any time to get the latest information on the load
"""
def __init__(
self,
config: DashConfig,
load_type: str = None,
table_name: str = None,
load_as_service_client_id: str = None,
partitions: Optional[List[str]] = None,
load_id: str = None,
track_rows_uploaded: bool = None,
path_to_output_for_dryrun: str = None,
modify_lambda: Callable = None,
chunksize: int = None
):
"""
Parameters
----------
config : DashConfig
Object of type DashConfig including configuration details.
load_type : str
Load Type, initially only APPEND_ONLY supported. APPEND_ONLY means that data is appended to the lake table.
table_name : str
Name of lake table to be created and / or uploaded to.
load_as_service_client_id : str, optional
If provided, the upload is performed as if run by the service_client specified.
partitions : list[str], optional
Only applies if table does not already exist and is created. The created table will have these partitions. This must be a list of iceberg compatible partitions. Note that any load can only allow for up to 100 partitions, otherwise it will error out. If the table already exists, then this is ignored.
load_id : str, optional
In the case where you want to work with an existing load on dash, supply this parameter, and no other parameter (other than config) will be required.
track_rows_uploaded: bool, optional
If True, track the number of rows uploaded with the current Load instance. This can be used to automatically create a checksum on commit (see Load.commit), however is not recommended for
large files as this may increase the duration of upload significantly.
path_to_output_for_dryrun: str, optional
if specified, no upload will be made to dash, but files
will be saved to the location specified. This is useful for testing.
modify_lambda: Callable, optional
Callable which takes a pandas.DataFrame as the first arg.
Can be used to add/modify columns in the data before upload to the lake.
chunksize: int, default 30000
If a file is uploaded, it will be broken into chunks with chunksize rows before uploading. Note an index is added to the end of the file key to uniquely identify chunks.
"""
load_data = locals()
lowerlevel_load_sig = signature(comodash_api_client_lowlevel.Load)
lowerlevel_load_keys = [key for key in lowerlevel_load_sig.parameters.keys()]
if not(isinstance(config, DashConfig)):
raise TypeError("config must be of type comotion.dash.DashConfig")
self.config = config
with comodash_api_client_lowlevel.ApiClient(config) as api_client:
# Create an instance of the API class with provided parameters
self.load_api_instance = LoadsApi(api_client)
if (load_id is not None):
# if load_id provided, then initialise this object with the provided load_id
self.load_id = load_id
for key,value in load_data.items():
if key not in ['load_id', 'config', 'self']:
if value is not None:
raise TypeError("if load_id is supplied, then only the config parameter and no others should be supplied.")
else:
# Enter a context with an instance of the API client
lowerlevel_load_kwargs = {
key: value
for key, value in load_data.items()
if key in lowerlevel_load_keys
}
load = comodash_api_client_lowlevel.Load(**lowerlevel_load_kwargs)
# Create a new load
load_id_model = self.load_api_instance.create_load(load)
self.load_id = load_id_model.load_id
if track_rows_uploaded:
self.track_rows_uploaded = track_rows_uploaded
else:
self.track_rows_uploaded = False
self.rows_uploaded = 0
self.path_to_output_for_dryrun = path_to_output_for_dryrun
self.modify_lambda = modify_lambda
if not chunksize:
self.chunksize = 100000000 # Very large chunksize to avoid chunking unless required
else:
self.chunksize = chunksize
[docs] def refresh_api_instance(self):
auth_token = self.config.auth
zone = self.config.zone
orgname = auth_token.orgname
entity_type = auth_token.entity_type
if entity_type == Auth.APPLICATION:
application_client_id = auth_token.application_client_id
application_client_secret = auth_token.application_client_secret
else:
application_client_id = None
application_client_secret = None
self.config = DashConfig(
Auth(
orgname=orgname,
entity_type=entity_type,
application_client_id=application_client_id,
application_client_secret=application_client_secret
),
zone = zone
)
with comodash_api_client_lowlevel.ApiClient(self.config) as api_client:
# Create an instance of the API class with provided parameters
self.load_api_instance = LoadsApi(api_client)
[docs] def get_load_info(self) -> LoadInfo:
"""Gets the state of the load
Returns
-------
LoadInfo
Model containing all the load info, with the following attributes
`load_status`
Status of the load, one of OPEN, PROCESSING, FAIL or SUCCESS
`error_type`
Type of error if the load status is FAIL.
`error_messages`
Detailed error messages if the load status is FAIL.
"""
self.refresh_api_instance()
return self.load_api_instance.get_load(self.load_id)
[docs] def generate_presigned_url_for_file_upload(self, file_key: str = None) -> FileUploadResponse:
"""
Generates presigned URLs and STS credentials for a new file upload.
Parameters
----------
file_key : str, optional
Optional custom key for the file. This will ensure idempotence.
If multiple files are uploaded to the same load with the same file_key,
only the last one will be loaded. Must be lowercase and can include underscores.
Returns
-------
FileUploadResponse
Model containing all the relevant credentials to be able to upload a file to S3 as part of the load. This includes the following attributes:
- presigned_url : str
Presigned URL data for S3 file upload. The file can be posted to this endpoint using any AWS S3 compatible toolset.
Temporary credentials are included in the URL, so no other credentials are required.
- sts_credentials : dict
Alternatively to the presigned_url, these Temporary AWS STS credentials can be used to upload the file to the location specified by `path` and `bucket`.
This is required for various advanced toolsets, including AWS Wrangler.
- path : str
Path of the file in the S3 bucket. See the description of `sts_credentials`.
- bucket : str
Name of the S3 bucket. See the description of `sts_credentials`.
"""
file_upload_request = FileUploadRequest()
if file_key:
file_upload_request = FileUploadRequest(file_key=file_key)
self.refresh_api_instance()
return self.load_api_instance.generate_presigned_url_for_file_upload(self.load_id, file_upload_request=file_upload_request)
[docs] def upload_df(self,
data: pd.DataFrame,
file_key: str = None):
"""
Uploads a `pandas.DataFrame` as a parquet file to an S3 bucket using a presigned URL.
If `path_to_output_for_dryrun` is specified for the load, write the file to a local directory instead.
Before uploading, `modify_lambda` is applied. Then, column names are forced to be lowercase and spaces are replaced with underscores.
This is to prevent potential schema issues in the Dash lake.
Parameters:
-----------
data : pandas.DataFrame
The DataFrame to be uploaded. This should be a pandas.DataFrame object.
file_key : str, optional
Optional custom key for the file to be. If not provided, a random file key is created. See Load.create_file_key().
This will ensure idempotence. If multiple files are uploaded to the same load with the same `file_key`, only the last one will be pushed to the lake.
Must be lowercase and can include underscores.
Returns:
--------
None
Example:
--------
.. code-block:: python
import pandas as pd
load = Load(dashconfig)
df = pd.read_parquet('path/to/file.parquet')
load.upload_df(data = df, file_key='my_file_id')
"""
if not isinstance(data, pd.DataFrame):
raise ValueError("data should be a valid pandas.DataFrame object")
if not file_key:
file_key = self.create_file_key()
if self.modify_lambda:
self.modify_lambda(data)
data.columns = [re.sub(r'\s+', '_', column.lower()) for column in data.columns] # Replace spaces with underscores in column names
table = pa.Table.from_pandas(data)
parquet_buffer = io.BytesIO()
pq.write_table(table, parquet_buffer)
parquet_buffer.seek(0)
file_upload_response = self.generate_presigned_url_for_file_upload(file_key=file_key)
if not isinstance(file_upload_response, FileUploadResponse):
raise ValueError("file_upload_response should be a valid instance of FileUploadResponse.")
else:
bucket = file_upload_response.bucket
key = file_upload_response.path
# Upload to s3 if not a dry-run
if not self.path_to_output_for_dryrun:
s3_file_name = basename(file_upload_response.path)
# Create a session with AWS credentials from the presigned URL
my_session = boto3.Session(
aws_access_key_id=file_upload_response.sts_credentials['AccessKeyId'],
aws_secret_access_key=file_upload_response.sts_credentials['SecretAccessKey'],
aws_session_token=file_upload_response.sts_credentials['SessionToken']
)
# Upload the Parquet buffer as a chunk to S3
print(f"Uploading to S3: {s3_file_name}")
upload_reponse = wr.s3.upload(
local_file=parquet_buffer,
path=f"s3://{bucket}/{key}",
boto3_session=my_session,
use_threads=True
)
else:
# Commence dry run
local_path = join(self.path_to_output_for_dryrun, f"{basename(key)}.parquet")
table = pa.Table.from_pandas(data)
pq.write_table(table, local_path)
print(f"File written locally to: {local_path}")
upload_reponse = 'DRYRUN_COMPLETE' # Arbitrary reponse as file write has no return
if self.track_rows_uploaded:
# Count the rows in the Parquet file
rows_uploaded = data.shape[0]
self.rows_uploaded += rows_uploaded
print(f"Successfully uploaded {key}: {rows_uploaded} rows")
print(f"Total rows uploaded for load {self.load_id}: {self.rows_uploaded}")
else:
print(f"Upload completed: {key}")
return upload_reponse
[docs] def upload_file(
self,
data,
file_key: str = None,
use_file_name_as_key: bool = False,
max_workers: int = None,
**pd_read_kwargs
):
"""
Uploads a file to the lake table specified in the load, or a local file if `path_to_output_for_dryrun` was specified.
The file provided is read into a `pandas.DataFrame` using `pd_read_kwargs` with an appropriate pandas function.
Note an index is added to the end of the file key to uniquely identify chunks uploaded.
Parameters
----------
data : Any
The file to be uploaded. This should be readable by `pandas.read_csv`, `pandas.read_parquet`, `pandas.read_json` or `pandas.read_excel`.
file_key : str, optional
A unique key for the file being uploaded. If not provided, a key will be generated.
use_file_name_as_key : bool, optional
If True, the file name will be used as the file key. If False, a random key will be generated. Will throw an error when this is True and a file key is provided.
max_workers : int, optional
The maximum number of threads to use for concurrent uploads (passed to concurrent.futures.ThreadPoolExecutor)
**pd_read_kwargs
Additional keyword arguments to pass to the pandas read function (one of [pd.read_csv, pd.read_parquet, pd.read_json, pd.read_excel]).
You should not pass the variable pointing to the file here (e.g. filepath_or_buffer in pandas.read_csv), as this is passed in the data parameter.
Chunksize and nrows should also not be provided as extra parameters.
If you do not provide dtype, dtype will be determined automatically from the first chunk of data.
Returns
-------
List[Any]
A list of responses from the load upload API call for each chunk.
Raises
------
ValueError
If the file type cannot be determined or if an error occurs during the upload of any chunk.
"""
responses = []
invalid_keys = {'filepath_or_buffer', 'chunksize', 'nrows', 'path', 'path_or_buf', 'io'}
provided_invalid_keys = invalid_keys.intersection(pd_read_kwargs.keys())
if provided_invalid_keys:
raise ValueError(f"Do not provide the following keys: {', '.join(provided_invalid_keys)}")
print(f"Uploading file: {data}")
# Reading CSV in chunks, converting each to Parquet, and uploading
try_functions = [pd.read_csv, pd.read_parquet, pd.read_json, pd.read_excel]
if file_key and use_file_name_as_key:
raise Exception("Cannot provide a file key when use_file_name_as_key is True")
elif file_key and not use_file_name_as_key:
pass # To be explicit only
elif use_file_name_as_key and not file_key:
file_key = os.path.basename(data).split('.')[0] # Remove file extension
elif not use_file_name_as_key and not file_key:
file_key = self.create_file_key()
func_to_use = None
for func in try_functions:
try:
func(data, nrows=1, **pd_read_kwargs)
func_to_use = func
break
except:
pass
if not func_to_use:
raise ValueError(f"Could not determine file type for datasource with the following file key: {file_key}")
try:
i = 1
chunk_futures = []
with ThreadPoolExecutor(max_workers=max_workers) as chunk_ex: # Using threads for concurrent chunk uploads
for chunk in func_to_use(data, chunksize=self.chunksize, **pd_read_kwargs):
file_key_to_use = file_key + f"_{i}"
future = chunk_ex.submit(self.upload_df,
data=chunk,
file_key=file_key_to_use)
chunk_futures.append(future)
i += 1
for future in as_completed(chunk_futures):
responses.append(future.result())
except Exception as e:
raise ValueError(f"Error when uploading chunk: {e}")
print("All chunks uploaded successfully")
return responses
[docs] def upload_dash_query(
self,
data: Query,
file_key: str = None,
max_workers: int = None,
**pd_read_kwargs
):
"""
Uploads the result of a `dash.Query` object to the specified lake table, or a local file if `path_to_output_for_dryrun` was specified.
The result is read into a `pandas.DataFrame` and uploaded with the `Load.upload_df()` function.
Note an index is added to the end of the file key to uniquely identify chunks uploaded.
Parameters
----------
data : Query
The Query whose result is to be uploaded.
file_key : str, optional
A unique key for the file being uploaded. If not provided, a key will be generated.
max_workers : int, optional
The maximum number of threads to use for concurrent uploads (passed to `concurrent.futures.ThreadPoolExecutor`)
**pd_read_kwargs
Additional keyword arguments to pass to the pandas function.
Note that filepath_or_buffer, chunksize and dtype are passed by default and so duplicating those here could cause issues.
Returns
-------
List[Any]
A list of responses from the load upload API call for each chunk.
Raises
------
ValueError
If an error occurs during the upload of any chunk or during the Query.
"""
responses = []
print(f"Uploading Query with ID: {data.query_id}")
if not file_key:
file_key = self.create_file_key()
invalid_keys = {'filepath_or_buffer', 'chunksize', 'nrows', 'path', 'path_or_buf', 'io'}
provided_invalid_keys = invalid_keys.intersection(pd_read_kwargs.keys())
if provided_invalid_keys:
raise ValueError(f"Do not provide the following keys: {', '.join(provided_invalid_keys)}")
try:
i = 1
chunk_futures = []
data.wait_to_complete()
print(f"Query completed with state: {data.state()}")
if data.state() == data.SUCCEEDED_STATE:
with ThreadPoolExecutor(max_workers=max_workers) as chunk_ex: # Using threads for concurrent chunk uploads
for chunk in pd.read_csv(data.get_csv_for_streaming(), chunksize=self.chunksize, **pd_read_kwargs):
file_key_to_use = file_key + f"_{i}"
future = chunk_ex.submit(self.upload_df,
data=chunk,
file_key=file_key_to_use)
chunk_futures.append(future)
i += 1
for future in as_completed(chunk_futures):
responses.append(future.result())
else:
print(f"Query Status: {data.get_query_info().status}")
print("Please resolve query before re-attempting the upload.")
except Exception as e:
raise ValueError(f"Error when uploading chunk: {e}")
print("All chunks uploaded successfully")
return responses
[docs] def commit(self, check_sum: Optional[Dict[str, Union[int, float, str]]] = None):
"""
Kicks off the commit of the load. A checksum must be provided
which is checked on the server side to ensure that the data provided
has integrity. This is automatically created if you specify `track_rows_uploaded = True` when creating the load.
Parameters
----------
check_sum : Dict[str, Union[int, float, str]] (optional)
Checksum data for the files to be committed.
Checksums must be in the form of a dictionary, with presto / trino expressions
as the key, and the expected result as the value.
A check sum is not required if `track_rows_uploaded` was set to true for the load.
This essentially builds the checksum `{'count(*)': nrows_uploads}` and adds it as an extrac checksum.
Example:
.. code-block:: python
{
"count(*)" : 53,
"sum(my_value)": 123.3
}
"""
if not check_sum:
check_sum = {}
if not self.track_rows_uploaded:
raise KeyError("check_sum must be provided for this load as track_rows_uploaded was specified as False.")
if self.track_rows_uploaded:
check_sum["count(*)"] = self.rows_uploaded
load_commit = comodash_api_client_lowlevel.LoadCommit(check_sum=check_sum)
self.refresh_api_instance()
return self.load_api_instance.commit_load(self.load_id, load_commit)
[docs] def create_file_key(self) -> str:
"""Used to create a random, valid file key with specified length."""
# Generate a UUID
raw_uid = str(uuid.uuid4())
# Replace non-alphanumeric characters with underscores
file_key = 'x_' + re.sub(r'[^a-zA-Z0-9]', '_', raw_uid) # Add initial x_ underscore in case uid starts with integer
return file_key
[docs] def wait_to_complete(self):
"""Blocks until the load is in a complete state.
Returns
-------
str
State of the load, after processing completed.
"""
while True:
load_info = self.get_load_info()
if load_info.load_status != 'PROCESSING':
return load_info.load_status
time.sleep(5)
[docs]class DashBulkUploader():
"""
Class to handle multiple loads with utility functions by leveraging the `Load` class.
Since a `Load` creates an upload to a lake table, `DashBulkUploader` helps to manage uploads to several lake tables with 1 object.
Example of upload with a `DashBulkUploader` instance:
.. code-block:: python
config = DashConfig(Auth(orgname = 'my_org_name'))
uploader = DashBulkUploader(config = config)
# A better use would be to loop through a config to run the following below code repeatedly. A single upload is shown for demonstration purposes.
my_lake_table = 'v1_inforce_policies'
# Create the load in the uploader
uploader.add_load(
table_name = my_lake_table,
check_sum = {
'count(*)': 50000
}, # Alternatively, provide the track_rows_uploaded arg as true
load_as_service_client_id = '0'
)
# Add the relevant datasources to the load. This can point to a directory to upload all contained files
uploader.add_data_to_load(
table_name = my_lake_table,
data = 'data/inforce_policies'
)
# View uploads added
print(uploader.uploads)
uploader.execute_all_uploads() # Use execute_upload or execute_multiple_uploads if only certain uploads should be kicked off.
print(uploader.get_load_info()) # View status of uploads. This also refreshes uploader.uploads
# Remove uploads/datasources if you want to re-use the same instance
uploader.remove_load(table_name = my_lake_table)
"""
def __init__(self,
config: DashConfig) -> None:
self.config = config
self.pending_load_statuses = ['OPEN']
self.uploads = {}
[docs] def add_load(
self,
table_name: str,
check_sum: Optional[Dict[str, Union[int, float, str]]] = None,
modify_lambda: Callable = None,
load_type: str = 'APPEND_ONLY',
load_as_service_client_id: str = None,
partitions: Optional[List[str]] = None,
track_rows_uploaded: bool = False,
path_to_output_for_dryrun: str = None,
chunksize: int = None
) -> None:
"""
Creates a new load for a specified lake table. This function initializes the load
process, ensuring that the table name is in lowercase and that a checksum or row tracking
is provided for data integrity. Created loads can be fetched using the `DashBulkUploader.uploads` class variable.
Parameters
----------
table_name : str
The name of the lake table. Must be in lowercase.
check_sum : Optional[Dict[str, Union[int, float, str]]]
Checksum data for the files to be committed. The checksum should be a dictionary
with Presto/Trino expressions as keys and their expected results as values.
modify_lambda : Callable, optional
A lambda function to modify the data before loading.
load_type : str, default 'APPEND_ONLY'
The type of load operation. Default is 'APPEND_ONLY'.
load_as_service_client_id : str, optional
The service client ID to use for the load.
If service_client is not a field in the source data added to the load, this must be provided or the load will fail on commit.
partitions : Optional[List[str]], optional
List of partitions for the load to improve query efficiency from the Dash lake.
track_rows_uploaded : bool, default False
Whether to track the number of rows uploaded for the load.
path_to_output_for_dryrun : str, optional
If specified, no upload will be made to dash, but files
will be saved to the location specified. This is useful for testing.
chunksize: int, optional
Data source will be broken into chunks with chunksize rows before uploading.
Raises
------
ValueError
If the table name contains uppercase characters or if a load has already been created for the table.
KeyError
If neither `check_sum` nor `track_rows_uploaded` is provided.
Returns
-------
None
"""
if table_name.lower() != table_name:
raise ValueError('Only lowercase characters allowed for table_name.')
if table_name in self.uploads:
raise ValueError(f'A load has been created for the lake table already: {table_name}. Call DashBulkUploader().remove_load({table_name}) if you want to re-start this load.')
if not check_sum and not track_rows_uploaded:
raise KeyError("Invalid arguments: Either provide a check_sum value or set track_rows_uploaded to True.")
if not load_as_service_client_id:
print("WARNING: Dataset will not upload without specifying load_as_service_client_id option unless there is a column in the data source called service_client_id.")
print(f"Creating new load for lake table: {table_name}")
self.config._check_and_refresh_token()
load = Load(
config=self.config,
load_type=load_type,
table_name=table_name,
load_as_service_client_id=load_as_service_client_id,
partitions=partitions,
track_rows_uploaded=track_rows_uploaded,
path_to_output_for_dryrun=path_to_output_for_dryrun,
modify_lambda=modify_lambda,
chunksize=chunksize
)
print(f"Load ID: {load.load_id}")
self.uploads[table_name] = {
'load': load,
'data_sources': {},
'check_sum': check_sum,
'load_status': load.get_load_info().load_status
}
[docs] def add_data_to_load(
self,
table_name: str,
data: Union[str, pd.DataFrame, Query],
file_key: str = None,
source_type: str = None,
**pd_read_kwargs
) -> None:
"""
Adds data to an existing load for a specified lake table. This function supports adding data
from various sources including dataframes, directories, and files.
Parameters
----------
table_name : str
The name of the lake table to which data will be added. A load should already be added for this table_name.
data : Union[str, pd.DataFrame, Query]
The data to be added. Can be a path to a file or directory, a `pandas.DataFrame` or a `dash.Query`. See `Load` for how different upload types will be executed.
file_key : str, optional
Optional custom key for the file. This will ensure idempotence. Must be lowercase and can include underscores.
If not provided, a random file key will be generated using `DashBulkUploader.create_file_key()`.
If a directory is provided as the source of data, `file_key` is ignored and a random file key is generated for each file in the directory.
If multiple files are uploaded to the same load with the same `file_key`, only the last one will be pushed to the lake.
source_type : str, optional
The type of data source. Can be 'df' for DataFrame, 'dir' for directory, or 'file' for file.
If not specified, the function will attempt to infer the source type.
If a directory is provided, loop through the paths in the directory from `listdir()` and add valid files as datasources for the lake table.
pd_read_kwargs : dict, optional
Additional keyword arguments to pass to the pandas read function (one of [pd.read_csv, pd.read_parquet, pd.read_json, pd.read_excel]).
Raises
------
ValueError
If no existing load is found for the specified table, if the table name is not lowercase,
or if the source type cannot be identified.
KeyError
If neither `check_sum` nor `track_rows_uploaded` is provided.
Returns
-------
None
"""
upload = self.uploads.get(table_name)
if not upload:
raise ValueError(f"No existing load for lake table: {table_name}. First run add_load with the table_name specified before adding data to the load.")
load = upload['load']
if not file_key:
file_key = load.create_file_key()
if not source_type:
if isinstance(data, pd.DataFrame):
source_type = 'df'
elif isinstance(data, Query):
source_type = 'query'
elif isdir(data):
source_type = 'dir'
elif isfile(data):
source_type = 'file'
else:
raise ValueError("Source type could not be identified. Please fix datasource or specify the source_type as ['df', 'dir', 'file', 'query']")
if source_type == 'dir':
print(f"Unpacking data sources in directory: {data}")
data_files = [join(data, file_name) for file_name in listdir(data)]
for file in data_files:
if not isfile(join(data, file)):
print(f"The following path in the directory provided is not a file and so will not be added as a datasource: {file}")
else:
self.add_data_to_load(
table_name=table_name,
data=file,
file_key=None, # File keys can't be applied to directories - individual files should be specified if this is required
source_type='file',
**pd_read_kwargs
)
else:
data_source = {
'data': data,
'source_type': source_type,
'pd_read_kwargs': pd_read_kwargs
}
upload['data_sources'][file_key] = data_source
self.uploads[table_name] = upload
[docs] def remove_data_from_load(
self,
table_name,
file_key
):
"""
Deletes data source with specified file key for table_name from uploads class variable if the load has not been comitted yet.
"""
if self.uploads[table_name]['load_status'] in self.pending_load_statuses:
print(f"Removing {file_key} from load for {table_name}")
self.uploads[table_name]['data_sources'].pop(file_key)
else:
raise Exception("Load has already been committed. First run remove_load and attempt to re-upload.")
[docs] def remove_load(
self,
table_name
):
"""
Deletes load for specified table_name from uploads class variable.
"""
print(f"Removing {table_name} from uploads")
self.uploads.pop(table_name)
[docs] def execute_upload(
self,
table_name: str,
max_workers: int = None
) -> None:
"""
Executes the upload process for a specified lake table. This function uses multi-threading
to upload data sources concurrently and commits the load upon completion.
Parameters
----------
table_name : str
The name of the lake table to which data will be uploaded.
max_workers : int
The maximum number of threads to use for concurrent uploads (passed to `concurrent.futures.ThreadPoolExecutor`)
Raises
------
ValueError
If no existing load is found for the specified table.
Returns
-------
None
"""
upload = self.uploads[table_name]
load = upload['load']
# Refresh load status
load_status = load.get_load_info().load_status
if load_status in self.pending_load_statuses: # Only perform upload on pending loads
data_sources = upload['data_sources']
check_sum = upload['check_sum']
print(f"Uploading datasources to lake table: {table_name}")
upload_futures = []
with ThreadPoolExecutor(max_workers=max_workers) as upload_executor: # Use multi-threading to speed up uploads
for file_key, data_source in data_sources.items():
data = data_source['data']
source_type = data_source['source_type']
pd_read_kwargs = data_source['pd_read_kwargs']
print(f"Uploading data source with file key: {file_key}")
if source_type == 'df':
print("Uploading from DataFrame")
future = upload_executor.submit(load.upload_df,
data=data,
file_key=file_key
)
elif source_type == 'query':
future = upload_executor.submit(
load.upload_dash_query,
data=data,
file_key=file_key,
**pd_read_kwargs
)
elif source_type == 'file':
future = upload_executor.submit(load.upload_file,
data=data,
file_key=file_key,
**pd_read_kwargs
)
upload_futures.append(future)
for f in as_completed(upload_futures):
try:
f.result()
except Exception as e:
raise ValueError(f"Error uploading data source: {e}. Load not yet committed.")
# End of uploads
# Commit load
if not load.path_to_output_for_dryrun:
print(f"All uploads completed.")
if check_sum:
print(f"Committing load with the following checksums: {check_sum}")
else:
print("Committing load.")
load.commit(check_sum=check_sum)
self.uploads[table_name]['load_status'] = load.get_load_info().load_status
[docs] def execute_multiple_uploads(
self,
table_names: List[str],
max_workers: int = None
):
"""
Uses execute_upload function for each table name in the list provided.
"""
for table_name in table_names:
try:
self.execute_upload(
table_name = table_name,
max_workers = max_workers
)
except Exception as e:
print(f"Error executing upload to lake table {table_name}: {e}")
[docs] def execute_all_uploads(self):
"""
Uses execute_upload function for all loads created with the DashBulkUploader.
"""
table_names = [table_name for table_name in self.uploads.keys()]
self.execute_multiple_uploads(table_names = table_names)
[docs] def get_load_info(self):
"""
Retrieves the load information for all loads created. This also updates the load status for each
Load created by the `DashBulkUploader`.
Returns
-------
load_info : dict
A dictionary containing the load information for each table, with table names as keys
and their respective load statuses as values.
Raises
------
Exception
If an error occurs while retrieving the load information for any table, it is caught
and printed, but the function continues to retrieve the remaining load statuses.
"""
load_info = {}
for table_name, upload in self.uploads.items():
load = upload['load']
try:
self.uploads[table_name]['load_status'] = load.wait_to_complete()
load_info[table_name] = load.get_load_info()
except Exception as e:
print(f"Error getting load {load.load_id}: {e}")
self.uploads[table_name]['load_status'] = f'ERROR: {e}'
return load_info
[docs]def v1_upload_csv(
file: Union[str, io.FileIO],
dash_table: str,
dash_orgname: str,
dash_api_key: str,
encoding: str = 'utf-8',
chunksize: int = 30000,
modify_lambda: Callable = None,
path_to_output_for_dryrun: str = None,
service_client_id: str = '0'
):
"""
.. Warning::
This function is deprecated. Use `read_and_upload_file_to_dash` instead.
Reads a file and uploads it to Dash.
This function will:
- Read a csv file
- Break it up into multiple csv's
- each with a maximum number of lines defined by chunksize
- upload them to dash
Parameters
----------
file : Union[str, io.FileIO]
Either a path to the file to be uploaded,
or a FileIO stream representing the file to be uploaded
Should be an unencrypted, uncompressed CSV file
dash_table: str
name of Dash table to upload the file to
dash_orgname: str
orgname of your Dash instance
dash_api_key: str
valid api key for Dash API
encoding: str
the encoding of the source file. defaults to utf-8.
chunksize: int
(optional)
the maximum number of lines to be included in each file.
Note that this should be low enough that the zipped file is less
than Dash maximum gzipped file size. Defaults to 30000.
modify_lambda:
(optional)
a callable that recieves the pandas dataframe read from the
csv. Gives the opportunity to modify - such as adding a timestamp
column.
Is not required.
path_to_output_for_dryrun: str
(optional)
if specified, no upload will be made to dash, but files
will be saved to the location specified. This is useful for
testing.
multiple files will be created: [table_name].[i].csv.gz where i
represents multiple file parts
service_client_id: str
(optional)
if specified, specifies the service client for the upload. See the dash documentation for an explanation of service client.
https://docs.comotion.us/Comotion%20Dash/Analysts/How%20To/Prepare%20Your%20Data%20Model/Y%20Service%20Client%20and%20Row%20Level%20Security.html#service-client-and-row-level-security
Returns
-------
List
List of http responses
"""
file_reader = pd.read_csv(
file,
chunksize=chunksize,
encoding=encoding,
dtype=str # Set all columns to strings. Dash will still infer the type, but this makes sure it doesnt mess with the contents of the csv before upload
)
i = 1
responses = []
for file_df in file_reader:
if modify_lambda is not None:
modify_lambda(file_df)
csv_stream = create_gzipped_csv_stream_from_df(file_df)
if path_to_output_for_dryrun is None:
response = upload_csv_to_dash(
dash_orgname=dash_orgname,
dash_api_key=dash_api_key,
dash_table=dash_table,
csv_gz_stream=csv_stream,
service_client_id=service_client_id
)
responses.append(response.text)
else:
with open(
join(
path_to_output_for_dryrun,
dash_table + "." + str(i) + ".csv.gz"
),
"wb"
) as f:
f.write(csv_stream.getvalue())
i = i + 1
return responses
[docs]def upload_csv_to_dash(
dash_orgname: str, # noqa
dash_api_key: str,
dash_table: str,
csv_gz_stream: io.FileIO,
service_client_id: str = '0'
) -> requests.Response:
"""Uploads csv gzipped stream to Dash
Expects a csv gzipped stream to upload to dash.
Args:
dash_orgname (str): Dash organisation name for dash instance
dash_api_key (str): Valid API key for the organisation instance
dash_table (str): Table name to upload to
csv_gz_stream (io.FileIO): Description
Returns:
requests.Response: response from dash api
Raises:
HTTPError: If one is raised by the call
"""
url = "https://api.comodash.io/v1/data-input-file"
headers = {
'Content-Type': 'application/gzip',
'service_client_id': service_client_id,
'x-api-key': dash_api_key,
'org-name': dash_orgname,
'table-name': dash_table
}
dash_response = requests.request(
"POST",
url,
headers=headers,
data=csv_gz_stream.getbuffer()
)
dash_response.raise_for_status()
return dash_response
[docs]def create_gzipped_csv_stream_from_df(df: pd.DataFrame) -> io.BytesIO:
"""Returns a gzipped, utf-8 csv file bytestream from a pandas dataframe
Useful to help upload dataframes to dash
It does not break file up, so be sure to apply a maximise chunksize
to the dataframe before applying - otherwise dash max file limits will
cause an error
Parameters
----------
df : pd.DataFrame
Dateframe to be turned into bytestream
Returns
-------
io.BytesIO
The Bytestream
"""
csv_stream = io.BytesIO()
df.to_csv(
csv_stream,
compression="gzip",
encoding="utf-8",
index=False,
quoting=csv.QUOTE_NONNUMERIC
)
return csv_stream
[docs]def read_and_upload_file_to_dash(
file: Union[str, io.FileIO],
dash_table: str,
dash_orgname: str,
file_key: str = None,
dash_api_key: str = None,
check_sum: Optional[Dict[str, Union[int, float, str]]] = None,
encoding: str = 'utf-8',
chunksize: int = 30000,
modify_lambda: Callable = None,
path_to_output_for_dryrun: str = None,
service_client_id: str = '0',
partitions: Optional[List[str]] = None,
load_type: str = 'APPEND_ONLY',
data_model_version: str = None,
entity_type: str = Auth.USER,
application_client_id: str = None,
application_client_secret: str = None
) -> Union[List[Any], DashBulkUploader]:
"""
.. Warning::
This function will be deprecated on 1 December 2025. Please Migrate and move to the Load/DashBulkUploader class before then.
Reads a file and uploads it to Dash.
This function will:
- Read a CSV file
- Break it up into multiple CSVs, each with a maximum number of lines defined by `chunksize`
- Upload them to Dash
Parameters
----------
file : Union[str, io.FileIO]
Either a path to the file to be uploaded, or a FileIO stream representing the file to be uploaded.
Should be an unencrypted, uncompressed CSV file.
dash_table : str
Name of the Dash table to upload the file to.
dash_orgname : str
Orgname of your Dash instance.
file_key : str, optional
A unique key for the file being uploaded. If not provided, a key will be generated.
dash_api_key : str, optional
Valid API key for Dash API. Required for v1 data model uploads.
check_sum : Optional[Dict[str, Union[int, float, str]]], optional
Checksum data for the files to be committed. The checksum should be a dictionary
with Presto/Trino expressions as keys and their expected results as values.
encoding : str, default 'utf-8'
The encoding of the source file.
chunksize : int, default 30000
The maximum number of lines to be included in each file. Note that this should be low enough
that the zipped file is less than Dash's maximum gzipped file size.
modify_lambda : Callable, optional
A callable that receives the pandas DataFrame read from the CSV. Provides the opportunity to modify
the DataFrame, such as adding a timestamp column.
path_to_output_for_dryrun : str, optional
If specified, no upload will be made to Dash, but files will be saved to the location specified.
This is useful for testing. Multiple files will be created (1 per chunk)
service_client_id : str, optional
If specified, specifies the service client for the upload. See the Dash documentation for an explanation
of the service client.
https://docs.comotion.us/Comotion%20Dash/Analysts/How%20To/Prepare%20Your%20Data%20Model/Y%20Service%20Client%20and%20Row%20Level%20Security.html#service-client-and-row-level-security
partitions : Optional[List[str]], optional
List of partitions for the load.
load_type : str, default 'APPEND_ONLY'
The type of load operation. Default is 'APPEND_ONLY'.
data_model_version : str, optional
The data model version to use for the upload. If not specified, the function will determine the version.
If the migration status for the org is 'Completed', v2 is the model version. Otherwise, v1 is the model version.
data_model_version only needs be specified in exceptional circumstances where there are issues determining the migration status.
entity_type : str, default Auth.USER
The entity type for authentication. Use Auth.USER if uploading as a user. Use Auth.APPLICATION if uploading with application credentials.
application_client_id : str, optional
The application client ID for authentication.
application_client_secret : str, optional
The application client secret for authentication.
Returns
-------
Union[List[Any], DashBulkUploader]
For v1 data model uploads, returns a list of HTTP responses.
For v2 data model uploads, returns the DashBulkUploader instance.
Raises
------
ValueError
If the API key is not specified for v1 lake upload or if the file type cannot be determined.
"""
auth_token = Auth(orgname=dash_orgname,
entity_type=entity_type,
application_client_id=application_client_id,
application_client_secret=application_client_secret)
config = DashConfig(auth = auth_token)
uploader = DashBulkUploader(config = config)
if not data_model_version or data_model_version not in ['v1', 'v2']:
print("Determining Data Model Version")
try:
config = DashConfig(auth_token)
# Get migration status
migration = Migration(config)
migration_status = migration.status().full_migration_status
print('Migration Status: ' + migration_status)
if migration_status in ['Completed', 'Complete']:
data_model_version = 'v2'
else:
data_model_version = 'v1'
except Exception as e:
print(f'Error determining data model version: {e}')
data_model_version = 'v1'
else:
config = DashConfig(auth_token)
print(f"Uploading to data model {data_model_version}")
if data_model_version == 'v1':
if dash_api_key is None:
raise ValueError("API Key needs to be specified for v1 lake upload.")
responses = v1_upload_csv(
file=file,
dash_table=dash_table,
dash_orgname=dash_orgname,
dash_api_key=dash_api_key,
encoding=encoding,
chunksize=chunksize,
modify_lambda=modify_lambda,
path_to_output_for_dryrun=path_to_output_for_dryrun,
service_client_id=service_client_id
)
return responses
elif data_model_version == 'v2':
track_rows_uploaded = not check_sum
uploader.add_load(
table_name=dash_table,
check_sum=check_sum,
modify_lambda=modify_lambda,
load_type=load_type,
load_as_service_client_id=service_client_id,
partitions=partitions,
track_rows_uploaded=track_rows_uploaded,
path_to_output_for_dryrun=path_to_output_for_dryrun,
chunksize=chunksize
)
uploader.add_data_to_load(
table_name=dash_table,
data=file,
file_key=file_key
)
uploader.execute_upload(table_name=dash_table)
return uploader
[docs]class Migration():
"""
The Migration object starts and tracks a migration from lake v1 to lake v2. It can only be run once, after which the old lake will be disabled.
"""
def __init__(
self,
config: DashConfig
):
if not(isinstance(config, DashConfig)):
raise TypeError("config must be of type comotion.dash.DashConfig")
with comodash_api_client_lowlevel.ApiClient(config) as api_client:
# Create an instance of the API class with provided parameters
self.migration_api_instance = MigrationsApi(api_client)
[docs] def start(
self,
migration_type: str = "FLASH_SCHEMA",
clear_out_new_lake: bool = False
):
"""
Starts a migration.
Migrations can take a number of hours to complete. So get a cup of coffee.
Initialising this class starts the migration on Comotion Dash. If a migration is already in progress, initialisation will monitor the active load.
There are two types of migraiton, set by `migration_type`:
- `FULL_MIGRATION`: runs a full migration by copying all the data across, updating the insights tables and deactivating data loads to the old lake.
- `FLASH_SCHEMA`: copies the schema and one line of data per table to the new lake. this is useful to dev and test ETL scripts
There is an option to clear out the new lake on migration, set by the boolean parameters `clear_out_new_lake`. This is useful when testing has taken place, and data needs to be cleared.
If `clear_out_new_lake` is set to False, the migration will fail if there is data in the new lake.
Parameters
----------
config : DashConfig
Object of type DashConfig including configuration details
migration_type : str
whether to run a full migration ("FULL_MIGRATION") or only copy the schema of the lake across to the new lake ("FLASH_SCHEMA")
clear_out_new_lake : bool
whether to clear out the new lake on migration.
Returns
-------
"""
if migration_type not in ['FLASH_SCHEMA','FULL_MIGRATION']:
raise ValueError('`migration_type` must be one of FLASH_SCHEMA or FULL_MIGRATION')
migration_data = {
'migration_type': migration_type,
'clear_out_new_lake': 'CLEAR_OUT' if clear_out_new_lake else 'DO_NOT_CLEAR_OUT'
}
# Create an instance of the API class with provided parameters
migration = comodash_api_client_lowlevel.Migration(**migration_data)
try:
self.migration_api_instance.start_migration(migration)
except comodash_api_client_lowlevel.exceptions.BadRequestException as exp:
raise ValueError(json.loads(exp.body)['message'])
except comodash_api_client_lowlevel.exceptions.ApiException as e:
raise ValueError(json.loads(e.body)['message'])
[docs] def status(self):
try:
migration_status = self.migration_api_instance.get_migration()
return migration_status
except comodash_api_client_lowlevel.exceptions.BadRequestException as exp:
raise ValueError(json.loads(exp.body)['message'])
except comodash_api_client_lowlevel.exceptions.ApiException as e:
raise ValueError(json.loads(e.body)['message'])
[docs]def upload_from_oracle(
sql_host: str,
sql_port: int,
sql_service_name: str,
sql_username: str,
sql_password: str,
sql_query: str,
dash_table: str,
dash_orgname: str,
dash_api_key: str,
dtypes: dict = None,
include_snapshot: bool = True,
export_csvs: bool = True,
chunksize: int = 50000,
output_path: str = None,
sep: str ='\t',
max_tries: int = 5
):
"""
Uploads data from a Oracle SQL database object to dash.
This function will:
- get the total number of rows chunks need for the sql query
- get chunks of data from the SQL database
- upload them to dash
- append them to csv output (if specified)
- save error chunks as csv (if any)
Parameters
----------
sql_host: str
SQL hot e.g. '192.168.0.0.1'
sql_port: str
SQL port number e.g 9005
sql_service_name: str
SQL service name e.g. 'myservice'
sql_username: str
SQL username,
sql_password: str
SQL password,
sql_query: str
SQL query,
dash_table: str
name of Dash table to upload the data to
dash_orgname: str
orgname of your Dash instance
dash_api_key: str
valid api key for Dash API
export_csvs:
(optional)
If True, data successfully uploaded is exported as csv
Defaults to True i.e. output is included
dtypes: dict
(optional)
A dictionary that contains the column name and data type to convert to.
Defaults to None i.e. load dataframe columns as they are.
chunksize: int
(optional)
the maximum number of lines to be included in each file.
Note that this should be low enough that the zipped file is less
than Dash maximum gzipped file size.
Defaults to 50000.
sep: str
(optional)
Field delimiter for ComoDash table to upload the dataframe to.
Defaults to /t.
output_path: str
(optional)
if specified, no output csv are saved in that location. If not, output is place in the same location as the script
Defaults to None
include_snapshot: bool
(optional)
If True, an additional column 'snapshot_timestamp' will be added to the DataFrame.
This column will contain the time that data is loaded in "%Y-%m-%d %H:%M:%S.%f"
format in order to help with database management
Defaults to True i.e. snapshot_timestamp is included
max_tries: int
(optional)
Maximum number of times to retry if there is an HTTP error
Defaults to 5
Returns
-------
pd.DataFrame
DataFrame with errors
"""
# Initialize data upload
print("Initializing. This will take a little while..")
url = "https://api.comodash.io/v1/data-input-file"
headers = {
'Content-Type': 'application/gzip',
'service_client_id': '0',
'x-api-key': dash_api_key,
'org-name': dash_orgname,
'table-name': dash_table
}
snapshot_timestamp = datetime.now()
# Create load id
load_id = dash_table + "_" + snapshot_timestamp.strftime("%Y%m%d%H%M")
# Create output folder
if export_csvs:
if output_path:
if not os.path.exists(output_path):
os.mkdir(output_path)
else:
output_path = os.getcwd()
# Create log file
log_file = os.path.join(output_path, load_id + ".log")
formatter = logging.Formatter(
fmt='%(asctime)s - %(name)-12s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d,%H:%M:%S'
)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
# Create sqlalchemy engine
sql_dsn = cx_Oracle.makedsn(sql_host, sql_port, service_name=sql_service_name)
connection_string = 'oracle://{user}:{password}@{dsn}'.format(user=sql_username, password=sql_password, dsn=sql_dsn)
engine = sqlalchemy.create_engine(connection_string, max_identifier_length=128)
# Connect to database
with engine.connect() as connection:
# Table properties
result = connection.execute(sql_query)
columns = [col for col in result.keys()]
# Calculate results length
length = connection.execute(f"select count(*) from ({sql_query})").fetchall()[0][0]
if length % chunksize == 0:
chunk_number = int(length/chunksize)
else:
chunk_number = int(length/chunksize) + 1
print(f"Starting to upload table with {length} rows in {chunk_number} chunks.")
# Empty list to store chunks that fail to upload
error_chunks = []
with open(os.path.join(output_path, load_id + "_success.csv.gz"), "wb") as f:
for i in tqdm(range(chunk_number)):
# Get data chunk
chunk = pd.DataFrame(result.fetchmany(chunksize), columns=columns)
# Include snapshort time if include_snapshot == True
if include_snapshot:
chunk['snapshot_timestamp'] = snapshot_timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")
# Change columns format if dtypes is specified
if dtypes:
chunk = chunk.astype(dtype=dtypes)
# Create gz_stream
csv_stream = io.BytesIO()
chunk.to_csv(
csv_stream,
compression="gzip",
encoding="utf-8",
index=False,
quoting=csv.QUOTE_MINIMAL,
sep=sep
)
trial = 1
while True:
try:
response = requests.request(
method="POST",
url=url,
headers=headers,
data=csv_stream.getbuffer()
)
response.raise_for_status()
if response.status_code == 200:
logger.info(response.text)
trial = 1
if export_csvs:
f.write(csv_stream.getvalue())
break
except:
if trial >= max_tries:
logger.info(response.text)
error_chunks.append(chunk)
trial = 1
break
else:
logger.info(response.text)
if export_csvs:
f.write(csv_stream.getvalue())
trial += 1
print(f"Data uploaded with {len(error_chunks)} error files")
# Combine error chunks and export them as csv
if len(error_chunks) > 0:
print("Exporting errors...")
error_df = pd.concat(error_chunks)
error_df.to_csv(
os.path.join(output_path, load_id + "_fail.csv.gz"),
compression="gzip",
index=False,
sep=sep
)
return error_df
else:
return None