Skip to content

Query

The hamana library provides many classes to extract data from different sources, the Query class was introduced to provide a common interface to interact with these classes (connectors) or with the extracted data.

For example, when dealing with Database connectors, it can be used to execute SQL queries, while for File connectors, it is used to manage the extracted data. In addition, Query objects are natively connected with hamana internal database and can be used to perform operations on the extracted data.

Due to its frequent use, the Query class is available in the hamana module, so it can be imported directly from there. See all details in the API section.

Examples

This example shows how to use the Query class to execute a SQL query on an in-memory database.

import hamana as hm

# connect to in-memory database
hm.connect()

# define and execute a query
query = hm.Query("SELECT * FROM customers")
result = query.execute().reults

print(result.info())

# close connection
hm.disconnect()

This other example shows how to use the Query class to manage the extracted data from a CSV file.

import hamana as hm

# connect to CSV file
customers_csv = hm.connector.file.CSV("customers.csv")
query = customers_csv.execute()

# check results
print(query.result.info())

API

hamana.connector.db.query.Query

Query(
    query: str | Path,
    columns: list[TColumn] | None = None,
    params: (
        list[QueryParam] | dict[str, ParamValue] | None
    ) = None,
)

Bases: Generic[TColumn]

Class to represent a query object.

Source code in src/hamana/connector/db/query.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    query: str | Path,
    columns: list[TColumn] | None = None,
    params: list[QueryParam] | dict[str, ParamValue] | None = None
) -> None:

    # setup query
    if isinstance(query, Path):
        logger.info(f"loading query from file: {query}")

        if not query.exists():
            raise QueryInitializationError(f"file {query} not found")

        self.query = query.read_text()
    elif isinstance(query, str):
        if Path(query).exists():
            logger.info(f"loading query from file: {query}")
            with open(query, "r") as f:
                self.query = f.read()
        else:
            self.query = query

    self.columns = columns
    self.params = params

query instance-attribute

query: str

Query to be executed in the database. It is possible to provide directly the SQL query as a string or to load it from a file by providing the file path.

params class-attribute instance-attribute

params: list[QueryParam] | dict[str, ParamValue] | None = (
    None
)

List of parameters used in the query. The parameters are replaced by their values when the query is executed.

columns class-attribute instance-attribute

columns: list[TColumn] | None = None

Definition of the columns returned by the query. The columns are used to map the query result to the application data. If not provided, then the columns are inferred from the result.

flag_executed class-attribute instance-attribute

flag_executed: bool = False

Flag to indicate if the query has been executed.

result property writable

result: pd.DataFrame

Result of the query execution. The result is a pandas.DataFrame with columns equals the ones defined in the columns attribute, or inferred from the extraction.

Raises:

Type Description
QueryResultNotAvailable

if no result is available; e.g., the query has not been executed.

get_params

get_params() -> dict[str, ParamValue] | None

Returns the query parameters as a dictionary. Returns None if there are no parameters.

Source code in src/hamana/connector/db/query.py
109
110
111
112
113
114
115
116
117
118
def get_params(self) -> dict[str, ParamValue] | None:
    """
        Returns the query parameters as a dictionary.
        Returns `None` if there are no parameters.
    """
    if isinstance(self.params, list):
        _params = {param.name : param.value for param in self.params}
    else:
        _params = self.params
    return _params

to_sqlite

to_sqlite(
    table_name: str,
    mode: SQLiteDataImportMode = SQLiteDataImportMode.REPLACE,
) -> None

This function is used to insert the query result into a table hosted on the hamana internal database (HamanaConnector).

The hamana db is a SQLite database, for this reason bool, datetime and timestamp data types are not supported. If some of the columns are defined with these data types, then the method performs an automatic conversion to a SQLite data type.

In particular, the conversions are:

  • bool columns are mapped to INTEGER data type, with the values True and False converted to 1 and 0.
  • date and datetime columns are mapped to INTEGER datatype, with the values converted to an int number using the following format: YYYYMMDDHHmmss for dateitme, YYYYMMDD for date.

Parameters:

Name Type Description Default
table_name str

name of the table to create into the database. By assumption, the table's name is converted to uppercase.

required
mode SQLiteDataImportMode

mode of importing the data into the database.

SQLiteDataImportMode.REPLACE
Source code in src/hamana/connector/db/query.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def to_sqlite(self, table_name: str, mode: SQLiteDataImportMode = SQLiteDataImportMode.REPLACE) -> None:
    """
        This function is used to insert the query result into a 
        table hosted on the `hamana` internal database (`HamanaConnector`).

        The `hamana` db is a SQLite database, for this reason 
        `bool`, `datetime` and `timestamp` data types are **not** supported.
        If some of the columns are defined with these data types, 
        then the method performs an automatic conversion to a SQLite data type.

        In particular, the conversions are:

        - `bool` columns are mapped to `INTEGER` data type, with the values 
        `True` and `False` converted to `1` and `0`.
        - `date` and `datetime` columns are mapped to `INTEGER` datatype, with the values 
            converted to an int number using the following format: `YYYYMMDDHHmmss`
            for `dateitme`, `YYYYMMDD` for `date`.

        Parameters:
            table_name: name of the table to create into the database.
                By assumption, the table's name is converted to uppercase.
            mode: mode of importing the data into the database.
    """
    logger.debug("start")
    df_insert = self.result.copy()

    # set dtype
    columns_dtypes: dict | None = None
    if self.columns is not None:
        columns_dtypes = {}
        for column in self.columns:
            columns_dtypes[column.name] = DataType.to_sqlite(column.dtype)

            # convert columns
            match column.dtype:
                case DataType.BOOLEAN:
                    df_insert[column.name] = df_insert[column.name].astype(int)
                case DataType.DATE:
                    df_insert[column.name] = df_insert[column.name].dt.strftime("%Y%m%d").astype(int)
                case DataType.DATETIME:
                    df_insert[column.name] = df_insert[column.name].dt.strftime("%Y%m%d%H%M%S").astype(int)

    # import internal database
    from .hamana import HamanaConnector

    # get instance
    db = HamanaConnector.get_instance()

    # insert data
    table_name_upper = table_name.upper()
    try:
        with db:
            logger.debug(f"inserting data into table {table_name_upper}")
            logger.debug(f"mode: {mode.value}")
            df_insert.to_sql(
                name = table_name_upper,
                con = db.connection,
                if_exists = mode.value,
                dtype = columns_dtypes,
                index = False
            )
            logger.info(f"data inserted into table {table_name_upper}")
    except Exception as e:
        logger.error(f"error inserting data into table {table_name_upper}")
        logger.exception(e)
        raise e

    logger.debug("end")
    return

get_insert_query

get_insert_query(table_name: str) -> str

This function returns a query to insert the query result into a table.

Parameters:

Name Type Description Default
table_name str

name of the table to insert the data. By assumption, the table's name is converted to uppercase.

required

Returns:

Type Description
str

query to insert the data into the table.

Source code in src/hamana/connector/db/query.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_insert_query(self, table_name: str) -> str:
    """
        This function returns a query to insert the query result into a table.

        Parameters:
            table_name: name of the table to insert the data.
                By assumption, the table's name is converted to uppercase.

        Returns:
            query to insert the data into the table.
    """
    logger.debug("start")

    # check columns availablity
    if self.columns is None:
        logger.error("no columns available")
        raise QueryColumnsNotAvailable("no columns available")

    # get columns
    columns = ", ".join([column.name for column in self.columns])
    logger.debug("columns string created")

    # get values
    values = ", ".join(["?" for _ in self.columns])
    logger.debug("values string created")

    # build query
    table_name_upper = table_name.upper()
    query = f"INSERT INTO {table_name_upper} ({columns}) VALUES ({values})"
    logger.info(f"query to insert data into table {table_name_upper} created")
    logger.info(f"query: {query}")

    logger.debug("end")
    return query

get_create_query

get_create_query(table_name: str) -> str

This function returns a query to create a table based on the query result.

Parameters:

Name Type Description Default
table_name str

name of the table to be created. By assumption, the table's name is converted to uppercase.

required

Returns:

Type Description
str

query to create the table.

Source code in src/hamana/connector/db/query.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def get_create_query(self, table_name: str) -> str:
    """
        This function returns a query to create a table based on the query result.

        Parameters:
            table_name: name of the table to be created.
                By assumption, the table's name is converted to uppercase.

        Returns:
            query to create the table.
    """
    logger.debug("start")

    # check columns availablity
    if self.columns is None:
        logger.error("no columns available")
        raise QueryColumnsNotAvailable("no columns available")

    # build query
    table_name_upper = table_name.upper()
    query = "CREATE TABLE " + table_name_upper + " (\n" + \
            "".rjust(4) + ", ".rjust(4).join([f"{column.name} {DataType.to_sqlite(column.dtype)}\n" for column in self.columns]) + \
            ")"
    logger.info(f"query to create table {table_name_upper} created")
    logger.info(f"query: {query}")

    logger.debug("end")
    return query

get_column_names

get_column_names() -> list[str]

This function returns the column names of the query.

Returns:

Type Description
list[str]

list of column names.

Source code in src/hamana/connector/db/query.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def get_column_names(self) -> list[str]:
    """
        This function returns the column names of the query.

        Returns:
            list of column names.
    """
    logger.debug("start")

    # check columns availablity
    if self.columns is None:
        logger.error("no columns available")
        raise QueryColumnsNotAvailable("no columns available")
    columns = [column.name for column in self.columns]

    logger.debug("end")
    return columns

adjust_df

adjust_df(df: pd.DataFrame) -> pd.DataFrame

This function is used to adjust a pandas.DataFrame (usually the result of a query) based on the columns provided.

The function re-orders the columns of the DataFrame and checks the data types; if they do not match, then the function will try to convert the requested one.

Parameters:

Name Type Description Default
df pd.DataFrame

DataFrame to adjust

required

Raises:

Type Description
QueryColumnsNotAvailable

if the columns do not match between the query and the result.

ColumnDataTypeConversionError

if there is an error during the data type conversion.

Source code in src/hamana/connector/db/query.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def adjust_df(self, df: pd.DataFrame) -> pd.DataFrame:
    """
        This function is used to adjust a `pandas.DataFrame` (usually 
        the result of a query) based on the columns provided.

        The function re-orders the columns of the `DataFrame` 
        and checks the data types; if they do not match, then 
        the function will try to convert the requested one.

        Parameters:
            df: DataFrame to adjust

        Raises:
            QueryColumnsNotAvailable: if the columns do not match between the query and the result.
            ColumnDataTypeConversionError: if there is an error during the data type conversion.
    """
    logger.debug("start")

    # check columns availablity
    if self.columns is None:
        logger.error("no columns available")
        raise QueryColumnsNotAvailable("no columns available")
    columns = self.columns

    # get columns
    columns_query = []
    columns_df = df.columns.to_list()

    logger.info("get query columns ordered")
    for col in sorted(columns, key = lambda col: col.order if col.order is not None else 0):
        columns_query.append(col.name)

    # check columns_query is a subset of columns_df
    if not set(columns_query).issubset(columns_df):
        logger.error("columns do not match between query and resuls")
        raise QueryColumnsNotAvailable(f"columns do not match {set(columns_query).difference(columns_df)}")

    # re-order
    if columns_query != columns_df:
        logger.info("re-ordering columns")
        logger.info(f"order > {columns_query}")
        df = df.copy()[columns_query]
    else:
        logger.info("columns already in the correct order")

    # check data types
    logger.info("check data types")
    dtypes_df = df.dtypes
    for column in columns:

        dtype_query = column.dtype
        dtype_df = DataType.from_pandas(dtypes_df[column.name].name)
        logger.debug(f"column: {column.name}")
        logger.debug(f"datatype (query): {dtype_query}")
        logger.debug(f"datatype (df): {dtype_df}")

        if dtype_query != dtype_df:
            try:
                logger.info(f"different datatype for '{column.name}' column -> (query) {dtype_query} != (df) {dtype_df}")

                if column.parser is None:
                    logger.warning(f"no parser available for {column.name} (order: {column.order})")
                    logger.warning("skip column")
                    continue

                df[column.name] = column.parser.pandas(df[column.name])
            except Exception as e:
                logger.error("ERROR: on datatype change")
                logger.error(e)
                raise ColumnDataTypeConversionError(f"ERROR: on datatype change for {column.name} (order: {column.order})")

    logger.debug("end")
    return df

hamana.connector.db.query.QueryParam dataclass

QueryParam(name: str, value: ParamValue)

Class to represent a parameter used in a query.
A parameter is represented by a name and its value.

Usually, parameters are used to define general query conditions and are replaced by the actual values when the query is executed.

name instance-attribute

name: str

Name of the parameter.

value instance-attribute

value: ParamValue

Value of the parameter.

hamana.connector.db.query.TColumn module-attribute

TColumn = TypeVar('TColumn', bound=Column, covariant=True)