diff --git a/setup.py b/setup.py index c96bdd4..715c9c6 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ 'attrs==16.3.0', 'pendulum==1.2.0', 'singer-python==5.5.0', - 'PyMySQL==0.7.11', + 'PyMySQL==0.9.3', 'backoff==1.3.2', 'mysql-replication==0.18', ], diff --git a/tap_mysql/connection.py b/tap_mysql/connection.py index d213125..2c890b1 100644 --- a/tap_mysql/connection.py +++ b/tap_mysql/connection.py @@ -16,21 +16,6 @@ # We need to hold onto this for self-signed SSL match_hostname = ssl.match_hostname -# MySQL 8.0 Patch: -# Workaround to support MySQL 8.0 without upgrading the PyMySQL version -# since there are breaking changes between these versions, this should suffice to allow -# new character sets to be used with MySQL 8.0 instances. -# FIXME: Remove when PyMYSQL upgrade behavior has been evaluated. -# Patch Originally Found Here: https://github.com/PyMySQL/PyMySQL/pull/592 -original_charset_by_id = pymysql.charset.charset_by_id -def charset_wrapper(*args, **kwargs): - unknown_charset = pymysql.charset.Charset(None, None, None, None) - try: - return original_charset_by_id(*args, **kwargs) - except KeyError: - return unknown_charset -pymysql.connections.charset_by_id = charset_wrapper - @backoff.on_exception(backoff.expo, (pymysql.err.OperationalError), max_tries=5, diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index 1e412fa..d64341f 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -7,6 +7,7 @@ import time import tzlocal import pytz +import pymysql import singer.metrics as metrics from singer import metadata @@ -14,6 +15,30 @@ LOGGER = singer.get_logger() + +# NB: Upgrading pymysql from 0.7.11 --> 0.9.3 had the undocumented change +# to how `0000-00-00 00:00:00` date/time types are returned. In 0.7.11, +# they are returned as NULL, and in 0.9.3, they are returned as the string +# `0000-00-00 00:00:00`. To maintain backwards-compatability, we are +# monkey patching the functions so they continue returning None +original_convert_datetime = pymysql.converters.convert_datetime +original_convert_date = pymysql.converters.convert_date + +def monkey_patch_datetime(datetime_str): + value = original_convert_datetime(datetime_str) + if datetime_str == value: + return None + return value + +def monkey_patch_date(date_str): + value = original_convert_date(date_str) + if date_str == value: + return None + return value + +pymysql.converters.convert_datetime = monkey_patch_datetime +pymysql.converters.convert_date = monkey_patch_date + def escape(string): if '`' in string: raise Exception("Can't escape identifier {} because it contains a backtick" diff --git a/tests/test_date_types.py b/tests/test_date_types.py new file mode 100644 index 0000000..fd8d5ac --- /dev/null +++ b/tests/test_date_types.py @@ -0,0 +1,122 @@ +import unittest +import pymysql +import tap_mysql +import copy +import singer +import os +import singer.metadata +from tap_mysql.connection import connect_with_backoff + +try: + import tests.utils as test_utils +except ImportError: + import utils as test_utils + +import tap_mysql.sync_strategies.binlog as binlog +import tap_mysql.sync_strategies.common as common + +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.event import RotateEvent +from pymysqlreplication.row_event import ( + DeleteRowsEvent, + UpdateRowsEvent, + WriteRowsEvent, + ) + +from singer.schema import Schema + +LOGGER = singer.get_logger() + +SINGER_MESSAGES = [] + +def accumulate_singer_messages(message): + SINGER_MESSAGES.append(message) + +singer.write_message = accumulate_singer_messages + +class TestDateTypes(unittest.TestCase): + + def setUp(self): + self.conn = test_utils.get_test_connection() + self.state = {} + + log_file, log_pos = binlog.fetch_current_log_file_and_pos(self.conn) + + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + cursor.execute('CREATE TABLE datetime_types (id int, datetime_col datetime, timestamp_col timestamp, time_col time)') + cursor.execute('INSERT INTO datetime_types (id, datetime_col, timestamp_col, time_col) VALUES (1, \'0000-00-00\', \'0000-00-00 00:00:00\', \'00:00:00\' )') + cursor.execute('INSERT INTO datetime_types (id, datetime_col, timestamp_col, time_col) VALUES (2, NULL, NULL, NULL)') + open_conn.commit() + + self.catalog = test_utils.discover_catalog(self.conn, {}) + + for stream in self.catalog.streams: + stream.stream = stream.table + + stream.metadata = [ + {'breadcrumb': (), + 'metadata': { + 'selected': True, + 'database-name': 'tap_mysql_test', + 'table-key-propertes': ['id'] + }}, + {'breadcrumb': ('properties', 'id'), 'metadata': {'selected': True}}, + {'breadcrumb': ('properties', 'datetime_col'), 'metadata': {'selected': True}}, + {'breadcrumb': ('properties', 'timestamp_col'), 'metadata': {'selected': True}}, + {'breadcrumb': ('properties', 'time_col'), 'metadata': {'selected': True}} + ] + + test_utils.set_replication_method_and_key(stream, 'LOG_BASED', None) + + self.state = singer.write_bookmark(self.state, + stream.tap_stream_id, + 'log_file', + log_file) + + self.state = singer.write_bookmark(self.state, + stream.tap_stream_id, + 'log_pos', + log_pos) + + self.state = singer.write_bookmark(self.state, + stream.tap_stream_id, + 'version', + singer.utils.now()) + + def test_initial_full_table(self): + state = {} + expected_log_file, expected_log_pos = binlog.fetch_current_log_file_and_pos(self.conn) + + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + tap_mysql.do_sync(self.conn, {}, self.catalog, state) + + message_types = [type(m) for m in SINGER_MESSAGES] + + + self.assertEqual(message_types, + [singer.StateMessage, + singer.SchemaMessage, + singer.ActivateVersionMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.StateMessage, + singer.ActivateVersionMessage, + singer.StateMessage]) + + record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) + + # Expected from 0.7.11 + expected_records = [ + {'datetime_col': None, + 'id': 1, + 'timestamp_col': None, + 'time_col': '1970-01-01T00:00:00.000000Z'}, + {'datetime_col': None, + 'id': 2, + 'timestamp_col': None, + 'time_col': None} + ] + + self.assertEqual(expected_records, [x.asdict()['record'] for x in record_messages])