Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve test performance #1953

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4ebb4ff
Mark slow tests as slow
dagardner-nv Oct 15, 2024
5573454
Replace deprecated config modifications
dagardner-nv Oct 15, 2024
5ee378e
Tell pytest not to look in the tests/_utils dir, avoids warnings abou…
dagardner-nv Oct 15, 2024
8000ec3
HuggingFaceEmbeddings is now in langchain_community
dagardner-nv Oct 15, 2024
c98110a
Remove print
dagardner-nv Oct 15, 2024
a3e71b3
PyPDFLoader moved to langchain_community
dagardner-nv Oct 15, 2024
be4597a
Remove deprecated infer_datetime_format argument per https://pandas.p…
dagardner-nv Oct 15, 2024
10ad08a
Specify weights_only=False to avoid a FutureWarning
dagardner-nv Oct 15, 2024
366f4ff
Remove deprecated extra = "forbid"
dagardner-nv Oct 15, 2024
4915dac
validator has been replaced with field_validator
dagardner-nv Oct 15, 2024
552eceb
Fix redefined function name
dagardner-nv Oct 16, 2024
bf95dbb
Avoid warning about cudf.read_json wanting StringIO and now strings
dagardner-nv Oct 16, 2024
697e1f5
Bump unittest timeout to 90 minutes
dagardner-nv Oct 16, 2024
51ec950
Merge branch 'branch-24.10' into david-2410-warnings
dagardner-nv Oct 16, 2024
b178bbc
Move import into fixture to abvoid invoking during test discovery
dagardner-nv Oct 16, 2024
a13e25b
Remove redundant langchain fixture impl
dagardner-nv Oct 16, 2024
3ee6182
Restructure the rwd_conf fixture such that the file is read from disk…
dagardner-nv Oct 16, 2024
fe7f2a0
morpheus_sys_path should be session scope
dagardner-nv Oct 16, 2024
ea6b154
Consolidate dask and mlflow fixtures
dagardner-nv Oct 16, 2024
1a1c194
Consolidate langchain fixtures, fix conda command
dagardner-nv Oct 16, 2024
f591ea3
redefine fixtures as session fixtures
dagardner-nv Oct 16, 2024
b8301fd
Remove unused imports
dagardner-nv Oct 16, 2024
d3e70b9
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 16, 2024
b664298
Revert "Remove deprecated extra = "forbid""
dagardner-nv Oct 16, 2024
cad8c46
Use ConfigDict(extra='forbid') to replace nested config class
dagardner-nv Oct 16, 2024
657438d
Remove uneeded pylint rule disables
dagardner-nv Oct 16, 2024
3dea6d0
Use model_config rather than the decorator
dagardner-nv Oct 16, 2024
8583cb0
Drop the num_records=100 parameter
dagardner-nv Oct 16, 2024
a179cb2
Give the input and output topics the same postfix
dagardner-nv Oct 16, 2024
6ff03fb
Consolidate the milvus_service fixture
dagardner-nv Oct 16, 2024
65c97ad
Disable milvus' built-in wait_for_started functionality, instead use …
dagardner-nv Oct 17, 2024
767b2ae
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 17, 2024
9faf0e9
Remove some test parameters lowering the total number of test combina…
dagardner-nv Oct 17, 2024
03592c7
Since the rag pipeline only reads from milvus, refactor the rag tests…
dagardner-nv Oct 17, 2024
91c1c96
Remove unused imports
dagardner-nv Oct 17, 2024
0e6d5a0
Ignore code.visualstudio.com, these urls are being forbidden when run…
dagardner-nv Oct 17, 2024
dec56f7
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 18, 2024
46e5e2b
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 18, 2024
1473967
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tests/_utils/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@

@pytest.fixture(name='kafka_topics', scope='function')
def kafka_topics_fixture():
yield KafkaTopics(f'morpheus_unittest_input_{time.time()}', f'morpheus_unittest_output_{time.time()}')
"""
Every test receives a unique pair of Kafka topics
"""
ts = time.time()
yield KafkaTopics(f'morpheus_unittest_input_{ts}', f'morpheus_unittest_output_{ts}')


@pytest.fixture(name='kafka_bootstrap_servers', scope="function")
def kafka_bootstrap_servers_fixture(kafka_server: (subprocess.Popen, int)): # pylint: disable=redefined-outer-name
def kafka_bootstrap_servers_fixture(kafka_server: tuple[subprocess.Popen, int]): # pylint: disable=redefined-outer-name
"""
Used by tests that require both an input and an output topic
"""
Expand Down
59 changes: 44 additions & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,14 +502,12 @@ def disable_gc():
gc.enable()


def wait_for_camouflage(host="localhost", port=8000, timeout=30):
def wait_for_server(url: str, timeout: int, parse_fn: typing.Callable[[requests.Response], bool]) -> bool:

start_time = time.time()
cur_time = start_time
end_time = start_time + timeout

url = f"http://{host}:{port}/ping"

while cur_time - start_time <= timeout:
timeout_epoch = min(cur_time + 2.0, end_time)

Expand All @@ -518,13 +516,9 @@ def wait_for_camouflage(host="localhost", port=8000, timeout=30):
resp = requests.get(url, timeout=request_timeout)

if (resp.status_code == 200):
if (resp.json()['message'] == 'I am alive.'):
if parse_fn(resp):
return True

warnings.warn(("Camoflage returned status 200 but had incorrect response JSON. Continuing to wait. "
"Response JSON:\n%s"),
resp.json())

except Exception:
pass

Expand All @@ -537,6 +531,31 @@ def wait_for_camouflage(host="localhost", port=8000, timeout=30):
return False


def wait_for_camouflage(host: str = "localhost", port: int = 8000, timeout: int = 30):
url = f"http://{host}:{port}/ping"

def parse_fn(resp: requests.Response) -> bool:
if (resp.json()['message'] == 'I am alive.'):
return True

warnings.warn(("Camoflage returned status 200 but had incorrect response JSON. Continuing to wait. "
"Response JSON:\n%s"),
resp.json())
return False

return wait_for_server(url, timeout=timeout, parse_fn=parse_fn)


def wait_for_milvus(host: str = "localhost", port: int = 19530, timeout: int = 180):
url = f'http://{host}:{port}/healthz'

def parse_fn(resp: requests.Response) -> bool:
content = resp.content.decode('utf-8')
return 'OK' in content

return wait_for_server(url, timeout=timeout, parse_fn=parse_fn)


def _set_pdeathsig(sig=signal.SIGTERM):
"""
Helper function to ensure once parent process exits, its child processes will automatically die
Expand Down Expand Up @@ -954,18 +973,21 @@ def milvus_server_uri(tmp_path_factory):
yield uri

else:
from milvus import default_server
from milvus import MilvusServer

# Milvus checks for already bound ports but it doesnt seem to work for webservice_port. Use a random one
default_server.webservice_port = _get_random_port()
with default_server:
default_server.set_base_dir(tmp_path_factory.mktemp("milvus_store"))
milvus_server = MilvusServer(wait_for_started=False)

host = default_server.server_address
port = default_server.listen_port
# Milvus checks for already bound ports but it doesnt seem to work for webservice_port. Use a random one
webservice_port = _get_random_port()
milvus_server.webservice_port = webservice_port
milvus_server.set_base_dir(tmp_path_factory.mktemp("milvus_store"))
with milvus_server:
host = milvus_server.server_address
port = milvus_server.listen_port
uri = f"http://{host}:{port}"

logger.info("Started Milvus at: %s", uri)
wait_for_milvus(host=host, port=webservice_port, timeout=180)

yield uri

Expand All @@ -976,6 +998,13 @@ def milvus_data_fixture():
yield inital_data


@pytest.fixture(scope="session", name="milvus_service")
def milvus_service_fixture(milvus_server_uri: str):
from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBService
service = MilvusVectorDBService(uri=milvus_server_uri)
yield service


@pytest.fixture(scope="session", name="idx_part_collection_config")
def idx_part_collection_config_fixture():
from _utils import load_json_file
Expand Down
7 changes: 3 additions & 4 deletions tests/morpheus/common/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@ def make_parse_fn(status: HTTPStatus = HTTPStatus.OK,

@pytest.mark.slow
@pytest.mark.parametrize("endpoints", [("/t1", "/t2", "/t3"), ("test/", "123/", "a1d/"), ("/a", "/a/b", "/a/b/c/d")])
@pytest.mark.parametrize("port", [8088, 9090])
@pytest.mark.parametrize("method", ["GET", "POST", "PUT"])
@pytest.mark.parametrize("port", [9090])
@pytest.mark.parametrize("method", ["GET", "POST"])
@pytest.mark.parametrize("use_callback", [True, False])
@pytest.mark.parametrize("use_context_mgr", [True, False])
@pytest.mark.parametrize("num_threads", [1, 2, min(8, len(os.sched_getaffinity(0)))])
@pytest.mark.parametrize("num_threads", [1, min(8, len(os.sched_getaffinity(0)))])
@pytest.mark.parametrize("status,content_type,content",
[(HTTPStatus.OK, MimeTypes.TEXT.value, "OK"),
(HTTPStatus.OK, MimeTypes.JSON.value, '{"test": "OK"}'),
(HTTPStatus.NOT_FOUND, MimeTypes.TEXT.value, "NOT FOUND"),
(HTTPStatus.INTERNAL_SERVER_ERROR, MimeTypes.TEXT.value, "Unexpected error")])
def test_simple_request(port: int,
endpoints: typing.Tuple[str, str, str],
Expand Down
63 changes: 17 additions & 46 deletions tests/morpheus/stages/test_kafka_source_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from morpheus.config import Config
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.input.kafka_source_stage import AutoOffsetReset
from morpheus.stages.input.kafka_source_stage import KafkaSourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
Expand All @@ -52,7 +53,7 @@ def test_kafka_source_stage_pipe(config: Config, kafka_bootstrap_servers: str, k
KafkaSourceStage(config,
bootstrap_servers=kafka_bootstrap_servers,
input_topic=kafka_topics.input_topic,
auto_offset_reset="earliest",
auto_offset_reset=AutoOffsetReset.EARLIEST,
poll_interval="1seconds",
client_id='morpheus_kafka_source_stage_pipe',
stop_after=num_records))
Expand Down Expand Up @@ -85,7 +86,7 @@ def test_multi_topic_kafka_source_stage_pipe(config: Config, kafka_bootstrap_ser
KafkaSourceStage(config,
bootstrap_servers=kafka_bootstrap_servers,
input_topic=input_topics,
auto_offset_reset="earliest",
auto_offset_reset=AutoOffsetReset.EARLIEST,
poll_interval="1seconds",
client_id='test_multi_topic_kafka_source_stage_pipe',
stop_after=num_records))
Expand All @@ -100,38 +101,41 @@ def test_multi_topic_kafka_source_stage_pipe(config: Config, kafka_bootstrap_ser
@pytest.mark.gpu_and_cpu_mode
@pytest.mark.kafka
@pytest.mark.parametrize('async_commits', [True, False])
@pytest.mark.parametrize('num_records', [10, 100, 1000])
def test_kafka_source_commit(num_records: int,
async_commits: bool,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: KafkaTopics,
kafka_consumer: "KafkaConsumer") -> None:
@pytest.mark.parametrize('num_records', [10, 1000])
def test_kafka_source_batch_pipe(num_records: int,
async_commits: bool,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: KafkaTopics,
kafka_consumer: "KafkaConsumer") -> None:
group_id = 'morpheus'

data = [{'v': i} for i in range(num_records)]
num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data)
assert num_written == num_records

expected_length = config.pipeline_batch_size
num_exact = num_records // expected_length

kafka_consumer.subscribe([kafka_topics.input_topic])
seek_to_beginning(kafka_consumer)
partitions = kafka_consumer.assignment()

# This method does not advance the consumer, and even if it did, this consumer has a different group_id than the
# source stage
# This method does not advance the consumer, and this consumer has a different group_id than the source stage
expected_offsets = kafka_consumer.end_offsets(partitions)

pipe = LinearPipeline(config)
pipe.set_source(
KafkaSourceStage(config,
bootstrap_servers=kafka_bootstrap_servers,
input_topic=kafka_topics.input_topic,
auto_offset_reset="earliest",
auto_offset_reset=AutoOffsetReset.EARLIEST,
poll_interval="1seconds",
group_id=group_id,
client_id='morpheus_kafka_source_commit',
stop_after=num_records,
async_commits=async_commits))
pipe.add_stage(DFPLengthChecker(config, expected_length=expected_length, num_exact=num_exact))
pipe.add_stage(TriggerStage(config))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(SerializeStage(config))
Expand All @@ -148,39 +152,6 @@ def test_kafka_source_commit(num_records: int,
# The broker may have created additional partitions, offsets should be a superset of expected_offsets
for (topic_partition, expected_offset) in expected_offsets.items():
# The value of the offsets dict being returned is a tuple of (offset, metadata), while the value of the
# expected_offsets is just the offset.
# expected_offsets is just the offset.
actual_offset = offsets[topic_partition][0]
assert actual_offset == expected_offset


@pytest.mark.gpu_and_cpu_mode
@pytest.mark.kafka
@pytest.mark.parametrize('num_records', [1000])
def test_kafka_source_batch_pipe(config: Config,
kafka_bootstrap_servers: str,
kafka_topics: KafkaTopics,
num_records: int) -> None:
data = [{'v': i} for i in range(num_records)]
num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data)
assert num_written == num_records

expected_length = config.pipeline_batch_size
num_exact = num_records // expected_length

pipe = LinearPipeline(config)
pipe.set_source(
KafkaSourceStage(config,
bootstrap_servers=kafka_bootstrap_servers,
input_topic=kafka_topics.input_topic,
auto_offset_reset="earliest",
poll_interval="1seconds",
client_id='morpheus_kafka_source_stage_pipe',
stop_after=num_records))
pipe.add_stage(DFPLengthChecker(config, expected_length=expected_length, num_exact=num_exact))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(SerializeStage(config))
comp_stage = pipe.add_stage(
CompareDataFrameStage(config, pd.DataFrame(data=data), include=[r'^v$'], reset_index=True))
pipe.run()

assert_results(comp_stage.get_results())
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
from morpheus_llm.stages.llm.llm_engine_stage import LLMEngineStage


@pytest.fixture(scope="module", name="milvus_service")
def milvus_service_fixture(milvus_server_uri: str):
service = MilvusVectorDBService(uri=milvus_server_uri)
yield service


def _build_engine(vdb_service, **similarity_search_kwargs) -> LLMEngine:
mock_embedding = mock.AsyncMock(return_value=[[1.2, 2.3, 3.4], [4.5, 5.6, 6.7]])
engine = LLMEngine()
Expand Down
Loading
Loading