Skip to content

Commit

Permalink
Bump pymysql version (#116)
Browse files Browse the repository at this point in the history
* monkey patch date and datetime conversions

* bump pymysql version

* add test

* test for times as well

* update comment and test

Co-authored-by: nick-mccoy <[email protected]>
  • Loading branch information
nick-mccoy and nick-mccoy authored Mar 16, 2020
1 parent 1f2bb47 commit f001a01
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 16 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
'attrs==16.3.0',
'pendulum==1.2.0',
'singer-python==5.5.0',
'PyMySQL==0.7.11',
'PyMySQL==0.9.3',
'backoff==1.3.2',
'mysql-replication==0.18',
],
Expand Down
15 changes: 0 additions & 15 deletions tap_mysql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,6 @@
# We need to hold onto this for self-signed SSL
match_hostname = ssl.match_hostname

# MySQL 8.0 Patch:
# Workaround to support MySQL 8.0 without upgrading the PyMySQL version
# since there are breaking changes between these versions, this should suffice to allow
# new character sets to be used with MySQL 8.0 instances.
# FIXME: Remove when PyMYSQL upgrade behavior has been evaluated.
# Patch Originally Found Here: https://github.com/PyMySQL/PyMySQL/pull/592
original_charset_by_id = pymysql.charset.charset_by_id
def charset_wrapper(*args, **kwargs):
unknown_charset = pymysql.charset.Charset(None, None, None, None)
try:
return original_charset_by_id(*args, **kwargs)
except KeyError:
return unknown_charset
pymysql.connections.charset_by_id = charset_wrapper

@backoff.on_exception(backoff.expo,
(pymysql.err.OperationalError),
max_tries=5,
Expand Down
25 changes: 25 additions & 0 deletions tap_mysql/sync_strategies/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,38 @@
import time
import tzlocal
import pytz
import pymysql

import singer.metrics as metrics
from singer import metadata
from singer import utils

LOGGER = singer.get_logger()


# NB: Upgrading pymysql from 0.7.11 --> 0.9.3 had the undocumented change
# to how `0000-00-00 00:00:00` date/time types are returned. In 0.7.11,
# they are returned as NULL, and in 0.9.3, they are returned as the string
# `0000-00-00 00:00:00`. To maintain backwards-compatability, we are
# monkey patching the functions so they continue returning None
original_convert_datetime = pymysql.converters.convert_datetime
original_convert_date = pymysql.converters.convert_date

def monkey_patch_datetime(datetime_str):
value = original_convert_datetime(datetime_str)
if datetime_str == value:
return None
return value

def monkey_patch_date(date_str):
value = original_convert_date(date_str)
if date_str == value:
return None
return value

pymysql.converters.convert_datetime = monkey_patch_datetime
pymysql.converters.convert_date = monkey_patch_date

def escape(string):
if '`' in string:
raise Exception("Can't escape identifier {} because it contains a backtick"
Expand Down
122 changes: 122 additions & 0 deletions tests/test_date_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import unittest
import pymysql
import tap_mysql
import copy
import singer
import os
import singer.metadata
from tap_mysql.connection import connect_with_backoff

try:
import tests.utils as test_utils
except ImportError:
import utils as test_utils

import tap_mysql.sync_strategies.binlog as binlog
import tap_mysql.sync_strategies.common as common

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import RotateEvent
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)

from singer.schema import Schema

LOGGER = singer.get_logger()

SINGER_MESSAGES = []

def accumulate_singer_messages(message):
SINGER_MESSAGES.append(message)

singer.write_message = accumulate_singer_messages

class TestDateTypes(unittest.TestCase):

def setUp(self):
self.conn = test_utils.get_test_connection()
self.state = {}

log_file, log_pos = binlog.fetch_current_log_file_and_pos(self.conn)

with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('CREATE TABLE datetime_types (id int, datetime_col datetime, timestamp_col timestamp, time_col time)')
cursor.execute('INSERT INTO datetime_types (id, datetime_col, timestamp_col, time_col) VALUES (1, \'0000-00-00\', \'0000-00-00 00:00:00\', \'00:00:00\' )')
cursor.execute('INSERT INTO datetime_types (id, datetime_col, timestamp_col, time_col) VALUES (2, NULL, NULL, NULL)')
open_conn.commit()

self.catalog = test_utils.discover_catalog(self.conn, {})

for stream in self.catalog.streams:
stream.stream = stream.table

stream.metadata = [
{'breadcrumb': (),
'metadata': {
'selected': True,
'database-name': 'tap_mysql_test',
'table-key-propertes': ['id']
}},
{'breadcrumb': ('properties', 'id'), 'metadata': {'selected': True}},
{'breadcrumb': ('properties', 'datetime_col'), 'metadata': {'selected': True}},
{'breadcrumb': ('properties', 'timestamp_col'), 'metadata': {'selected': True}},
{'breadcrumb': ('properties', 'time_col'), 'metadata': {'selected': True}}
]

test_utils.set_replication_method_and_key(stream, 'LOG_BASED', None)

self.state = singer.write_bookmark(self.state,
stream.tap_stream_id,
'log_file',
log_file)

self.state = singer.write_bookmark(self.state,
stream.tap_stream_id,
'log_pos',
log_pos)

self.state = singer.write_bookmark(self.state,
stream.tap_stream_id,
'version',
singer.utils.now())

def test_initial_full_table(self):
state = {}
expected_log_file, expected_log_pos = binlog.fetch_current_log_file_and_pos(self.conn)

global SINGER_MESSAGES
SINGER_MESSAGES.clear()
tap_mysql.do_sync(self.conn, {}, self.catalog, state)

message_types = [type(m) for m in SINGER_MESSAGES]


self.assertEqual(message_types,
[singer.StateMessage,
singer.SchemaMessage,
singer.ActivateVersionMessage,
singer.RecordMessage,
singer.RecordMessage,
singer.StateMessage,
singer.ActivateVersionMessage,
singer.StateMessage])

record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES))

# Expected from 0.7.11
expected_records = [
{'datetime_col': None,
'id': 1,
'timestamp_col': None,
'time_col': '1970-01-01T00:00:00.000000Z'},
{'datetime_col': None,
'id': 2,
'timestamp_col': None,
'time_col': None}
]

self.assertEqual(expected_records, [x.asdict()['record'] for x in record_messages])

0 comments on commit f001a01

Please sign in to comment.