Skip to content

Commit

Permalink
Duplicate Connection: Added logic to query if a connection id exists …
Browse files Browse the repository at this point in the history
…before creating one (#18161)
  • Loading branch information
subkanthi authored Oct 9, 2021
1 parent f248a21 commit 3ddb365
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 22 deletions.
65 changes: 43 additions & 22 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3413,34 +3413,55 @@ def action_mulduplicate(self, connections, session=None):
for selected_conn in connections:
new_conn_id = selected_conn.conn_id
match = re.search(r"_copy(\d+)$", selected_conn.conn_id)

base_conn_id = selected_conn.conn_id
if match:
conn_id_prefix = selected_conn.conn_id[: match.start()]
new_conn_id = f"{conn_id_prefix}_copy{int(match.group(1)) + 1}"
else:
new_conn_id += '_copy1'

dup_conn = Connection(
new_conn_id,
selected_conn.conn_type,
selected_conn.description,
selected_conn.host,
selected_conn.login,
selected_conn.password,
selected_conn.schema,
selected_conn.port,
selected_conn.extra,
)
base_conn_id = base_conn_id.split('_copy')[0]

potential_connection_ids = [f"{base_conn_id}_copy{i}" for i in range(1, 11)]

query = session.query(Connection.conn_id).filter(Connection.conn_id.in_(potential_connection_ids))

found_conn_id_set = {conn_id for conn_id, in query}

possible_conn_id_iter = (
connection_id
for connection_id in potential_connection_ids
if connection_id not in found_conn_id_set
)
try:
session.add(dup_conn)
session.commit()
flash(f"Connection {new_conn_id} added successfully.", "success")
except IntegrityError:
new_conn_id = next(possible_conn_id_iter)
except StopIteration:
flash(
f"Connection {new_conn_id} can't be added. Integrity error, probably unique constraint.",
f"Connection {new_conn_id} can't be added because it already exists, "
f"Please rename the existing connections",
"warning",
)
session.rollback()
else:

dup_conn = Connection(
new_conn_id,
selected_conn.conn_type,
selected_conn.description,
selected_conn.host,
selected_conn.login,
selected_conn.password,
selected_conn.schema,
selected_conn.port,
selected_conn.extra,
)

try:
session.add(dup_conn)
session.commit()
flash(f"Connection {new_conn_id} added successfully.", "success")
except IntegrityError:
flash(
f"Connection {new_conn_id} can't be added. Integrity error, "
f"probably unique constraint.",
"warning",
)
session.rollback()

self.update_redirect()
return redirect(self.get_redirect())
Expand Down
32 changes: 32 additions & 0 deletions tests/www/views/test_views_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,35 @@ def test_duplicate_connection(admin_client):
response = {conn[0] for conn in session.query(Connection.conn_id).all()}
assert resp.status_code == 200
assert expected_result == response


def test_duplicate_connection_error(admin_client):
"""Test Duplicate multiple connection with suffix
when there are already 10 copies, no new copy
should be created"""

connection_ids = [f'test_duplicate_postgres_connection_copy{i}' for i in range(1, 11)]
connections = [
Connection(
conn_id=connection_id,
conn_type='FTP',
description='Postgres',
host='localhost',
schema='airflow',
port=3306,
)
for connection_id in connection_ids
]

with create_session() as session:
session.query(Connection).delete()
session.add_all(connections)

data = {"action": "mulduplicate", "rowid": [connections[0].id]}
resp = admin_client.post('/connection/action_post', data=data, follow_redirects=True)

expected_result = {f'test_duplicate_postgres_connection_copy{i}' for i in range(1, 11)}

assert resp.status_code == 200
response = {conn[0] for conn in session.query(Connection.conn_id).all()}
assert expected_result == response

0 comments on commit 3ddb365

Please sign in to comment.