Skip to content

Database Connectors

The hamana.connector.db package provides a set of connectors to interact with databases. The package is designed to be extensible and easy to use.

Each connector must inherit from the DatabaseConnectorABC abstract class that provides a guideline for the implementation of the connector, see the Interface section for more details. From a high-level perspective, a connector must implement the following methods:

  • _connect: return the Connection object used to interact with the database.
  • __enter__: establish a connection to the database.
  • __exit__: close the connection to the database.
  • ping: check if the connection to the database is still valid.
  • execute: execute a query on the database. This approach could be useful for queries that rerturn a limited number of rows because the results are stored in memory.
  • batch_execute: similar to execute, but it is designed to handle queries that return a large number of rows. The function returns a generator that yields the results by a batch of rows.
  • to_sqlite: this method execute the query and store the results in a dedicated table in the internal hamana SQLite database. This approach is useful to store the results of a query and use them later in the analysis.

See more details in the method's description in the Interface section.

For all the databases admitting python library that follow PEP 249 "Python Database API Specification v2.0", hamana.connector.db package provides the BaseConnector class with a base implementation of the DatabaseConnectorABC interface. It is recommended to inherit from this class to implement a new connector, see the Base Connector section for more details.

The following example shows how to connect to an Oracle database, execute a query, and store the results in the internal hamana SQLite database.

import hamana as hm

# connect hamana database
hm.connect()

# connect Oracle database
oracle_db = hm.connector.db.Oracle.new(
    host = "localhost",
    port = 1521,
    user = "user",
    password = "password"
)

# execute and store a query
oracle_db.to_sqlite(
    query = hm.Query("SELECT * FROM orders"),
    table_name = "orders"
)

# disconnect
hm.disconnect()

The current available connectors are:

Sources Path Connector
Oracle hamana.connector.db Oracle
SQLite hamana.connector.db SQLite
Teradata hamana.connector.db Teradata

Interface

hamana.connector.db.interface.DatabaseConnectorABC

The following abstract class defines a general database connector.
A connector is used in order to connect to a database and perform operations on it.

All hamana connectors must inherit from this class.

connection instance-attribute

connection: ConnectionProtocol

Database connection.

This attribute is used to store the connection to the database and must satisfy the PEP 249 Standard Database API Specification v2.0.

__enter__ abstractmethod

__enter__() -> 'DatabaseConnectorABC'

Open a connection.

Source code in src/hamana/connector/db/interface.py
105
106
107
108
@abstractmethod
def __enter__(self) -> "DatabaseConnectorABC":
    """Open a connection."""
    raise NotImplementedError

__exit__ abstractmethod

__exit__(
    exc_type: Any, exc_value: Any, traceback: Any
) -> None

Close a connection.

Source code in src/hamana/connector/db/interface.py
110
111
112
113
@abstractmethod
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
    """Close a connection."""
    raise NotImplementedError

ping abstractmethod

ping() -> None

Function used to check the database connection.

Source code in src/hamana/connector/db/interface.py
115
116
117
118
@abstractmethod
def ping(self) -> None:
    """Function used to check the database connection."""
    raise NotImplementedError

parse_cursor_description abstractmethod

parse_cursor_description(
    cursor: Cursor,
) -> dict[str, Column | None]

The function is used to take a cursor object, parse its description, and extract a corresponding list of Column objects.

Parameters:

Name Type Description Default
cursor Cursor

cursor to parse.

required

Returns:

Type Description
dict[str, Column | None]

dictionary containing the columns extracted from the cursor.

Source code in src/hamana/connector/db/interface.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@abstractmethod
def parse_cursor_description(self, cursor: Cursor) -> dict[str, Column | None]:
    """
        The function is used to take a cursor object, 
        parse its description, and extract a corresponding list 
        of `Column` objects.

        Parameters:
            cursor: cursor to parse.

        Returns:
            dictionary containing the columns extracted from the cursor.
    """
    raise NotImplementedError

get_column_from_dtype abstractmethod

get_column_from_dtype(
    dtype: Any, column_name: str, order: int
) -> Column

This function is designed to create a mapping between the datatypes provided by the different database connectors and standard hamana datatypes.

Parameters:

Name Type Description Default
dtype Any

data type of the column.

required
column_name str

name of the column.

required
order int

order of the column.

required

Returns:

Type Description
Column

Column object representing the column.

Raises:

Type Description
ColumnDataTypeConversionError

if the datatype does not have a corresponding mapping.

Source code in src/hamana/connector/db/interface.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@abstractmethod
def get_column_from_dtype(self, dtype: Any, column_name: str, order: int) -> Column:
    """
        This function is designed to create a mapping between the 
        datatypes provided by the different database connectors and 
        standard `hamana` datatypes.

        Parameters:
            dtype: data type of the column.
            column_name: name of the column.
            order: order of the column.

        Returns:
            `Column` object representing the column.

        Raises:
            ColumnDataTypeConversionError: if the datatype 
                does not have a corresponding mapping.
    """
    raise NotImplementedError

execute abstractmethod

execute(query: Query | str) -> None | Query

Function used to extract data from the database by executin a SELECT query.

Parameters:

Name Type Description Default
query Query | str

query to execute on database. The query could be a string or a Query object. If the query is a string, then the function automatically creates a Query object.

required

Returns:

Type Description
None | Query

The result depends on the input provided. If query is a string, then the function automatically creates a Query object, executes the extraction and returns the Query object with the result. If query is a Query object, then the function performs the extraction and return None because the result is stored in the object itself.

Source code in src/hamana/connector/db/interface.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@abstractmethod
def execute(self, query: Query | str) -> None | Query:
    """
        Function used to extract data from the database by 
        executin a SELECT query.

        Parameters:
            query: query to execute on database. The query could be 
                a string or a `Query` object. If the query is a string, 
                then the function automatically creates a `Query` object.

        Returns:
            The result depends on the input provided. 
                If query is a string, then  the function automatically 
                creates a `Query` object, executes the extraction and 
                returns the `Query` object with the result. 
                If query is a `Query` object, then the function performs 
                the extraction and return None because the result is stored 
                in the object itself.
    """
    raise NotImplementedError

batch_execute abstractmethod

batch_execute(
    query: Query, batch_size: int
) -> Generator[Any, None, None]

Function used to execute a query on the database and return the results in batches. This approach is used to avoid memory issues when dealing with large datasets.

Parameters:

Name Type Description Default
query Query

query to execute on database.

required
batch_size int

size of the batch to return.

required

Returns:

Type Description
None

Generator used to return the results in batches.

Source code in src/hamana/connector/db/interface.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@abstractmethod
def batch_execute(self, query: Query, batch_size: int) -> Generator[Any, None, None]:
    """
        Function used to execute a query on the database and return the results in batches. 
        This approach is used to avoid memory issues when dealing with large datasets.

        Parameters:
            query: query to execute on database.
            batch_size: size of the batch to return.

        Returns:
            Generator used to return the results in batches.
    """
    raise NotImplementedError

to_sqlite abstractmethod

to_sqlite(
    query: Query,
    table_name: str,
    raw_insert: bool = False,
    batch_size: int = 10000,
    mode: SQLiteDataImportMode = SQLiteDataImportMode.REPLACE,
) -> None

This function is used to extract data from the database and insert it into the hamana internal database (HamanaConnector).

The hamana db is a SQLite database, for this reason bool, datetime and timestamp data types are not supported. If some of the columns are defined with these data types, then the method could perform an automatic conversion to a SQLite data type.

The conversions are:

  • bool columns are mapped to INTEGER data type, with the values True and False converted to 1 and 0.
  • datetime columns are mapped to REAL data type, with the values converted to a float number using the following format: YYYYMMDD.HHmmss. Observe that the integer part represents the date in the format YYYYMMDD, while the decimal part represents the time component in the format HHmmss.

By default, the method performs the automatic datatype conversion. However, use the parameter raw_insert to avoid this conversion and improve the INSERT efficiency.

Parameters:

Name Type Description Default
query Query

query to execute on database.

required
table_name str

name of the table to insert the data. By assumption, the table's name is converted to uppercase.

required
raw_insert bool

bool value to disable/activate the datatype conversion during the INSERT process. By default, it is set to False.

False
batch_size int

size of the batch used during the inserting process.

10000
mode SQLiteDataImportMode

mode of importing the data into the database.

SQLiteDataImportMode.REPLACE
Source code in src/hamana/connector/db/interface.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@abstractmethod
def to_sqlite(self, query: Query, table_name: str, raw_insert: bool = False, batch_size: int = 10_000, mode: SQLiteDataImportMode = SQLiteDataImportMode.REPLACE) -> None:
    """
        This function is used to extract data from the database and insert it 
        into the `hamana` internal database (`HamanaConnector`).

        The `hamana` db is a SQLite database, for this reason 
        `bool`, `datetime` and `timestamp` data types are not supported.
        If some of the columns are defined with these data types, 
        then the method could perform an automatic conversion to 
        a SQLite data type.

        The conversions are:

        - `bool` columns are mapped to `INTEGER` data type, with the values 
            `True` and `False` converted to `1` and `0`.
        - `datetime` columns are mapped to `REAL` data type, with the values 
            converted to a float number using the following format: `YYYYMMDD.HHmmss`.
            Observe that the integer part represents the date in the format `YYYYMMDD`,
            while the decimal part represents the time component in the format `HHmmss`.

        By default, the method performs the automatic datatype 
        conversion. However, use the parameter `raw_insert` to 
        **avoid** this conversion and improve the INSERT efficiency. 

        Parameters:
            query: query to execute on database.
            table_name: name of the table to insert the data.
                By assumption, the table's name is converted to uppercase.
            raw_insert: bool value to disable/activate the datatype 
                conversion during the INSERT process. By default, it is 
                set to `False`.
            batch_size: size of the batch used during the inserting process.
            mode: mode of importing the data into the database.
    """
    raise NotImplementedError

Base Connector

The BaseConnector class provides a base implementation of the DatabaseConnectorABC interface. It is recommended to inherit from this class to implement a new connectors that follow the PEP 249 "Python Database API Specification v2.0".

The only methods that must be implemented are:

  • __init__: method that should initialize the connection to the database.
  • _connect: method returning the Connection object used to interact with the database.

hamana.connector.db.base.BaseConnector

Bases: DatabaseConnectorABC

Class representing a generic database connector that can be used to define new database connectors satisfying the Python Database API Specification v2.0.

New connectors that inherit from this class must implement the _connect method returning the connection proper connection object satisfying PEP 249.