-
Notifications
You must be signed in to change notification settings - Fork 208
/
Copy pathdb_api_extractor.py
83 lines (68 loc) · 2.3 KB
/
db_api_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import importlib
import logging
from typing import Any, Iterable
from pyhocon import ConfigTree
from databuilder.extractor.base_extractor import Extractor
LOGGER = logging.getLogger(__name__)
class DBAPIExtractor(Extractor):
"""
Generic DB API extractor.
"""
CONNECTION_CONFIG_KEY = 'connection'
SQL_CONFIG_KEY = 'sql'
def init(self, conf: ConfigTree) -> None:
"""
Receives a {Connection} object and {sql} to execute.
An optional model class can be passed, in which, sql result row
would be converted to a class instance and returned to calling
function
:param conf:
:return:
"""
self.conf = conf
self.connection: Any = conf.get(DBAPIExtractor.CONNECTION_CONFIG_KEY)
self.cursor = self.connection.cursor()
self.sql = conf.get(DBAPIExtractor.SQL_CONFIG_KEY)
model_class = conf.get('model_class', None)
if model_class:
module_name, class_name = model_class.rsplit(".", 1)
mod = importlib.import_module(module_name)
self.model_class = getattr(mod, class_name)
self._iter = iter(self._execute_query())
def _execute_query(self) -> Iterable[Any]:
"""
Use cursor to execute the {sql}
:return:
"""
LOGGER.info('Executing query: \n%s', self.sql)
self.cursor.execute(self.sql)
return self.cursor.fetchall()
def extract(self) -> Any:
"""
Fetch one sql result row, convert to {model_class} if specified before
returning.
:return:
"""
try:
result = next(self._iter)
except StopIteration:
return None
if hasattr(self, 'model_class'):
obj = self.model_class(*result[:len(result)])
return obj
else:
return result
def close(self) -> None:
"""
close cursor and connection handlers
:return:
"""
try:
self.cursor.close()
self.connection.close()
except Exception as e:
LOGGER.warning("Exception encountered while closing up connection handler!", e)
def get_scope(self) -> str:
return 'extractor.dbapi'