Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added cyberpandas support #12

Merged
merged 3 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ requirements:
- intake
- python
- pandas
- cyberpandas
- dask
- pcapy

Expand Down
2 changes: 1 addition & 1 deletion examples/dump-live.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pcapy

from intake_pcap import IPPacket
from intake_pcap.packet import IPPacket


if __name__ == '__main__':
Expand Down
15 changes: 12 additions & 3 deletions intake_pcap/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import namedtuple, OrderedDict

import pandas as pd
from cyberpandas import to_ipaddress

import pcapy

Expand All @@ -22,9 +23,9 @@ def __init__(self, reader, protocol, payload):
def dtype(self):
items = [
('time', 'datetime64[ns]'),
('src_host', 'object'),
('src_host', 'ip'),
('src_port', 'u4'),
('dst_host', 'object'),
('dst_host', 'ip'),
('dst_port', 'u4'),
('protocol', 'str')]

Expand Down Expand Up @@ -82,7 +83,15 @@ def decoder(header, data):

columns = FullPacket._fields if self._payload else BasePacket._fields
df = pd.DataFrame(packets, columns=columns)
return df.astype(dtype=self.dtype)

# DataFrame.astype doesn't work with extension types (yet).
# https://github.com/pandas-dev/pandas/issues/20557
known_types = {k: v for k, v in self.dtype.items()
if k not in ('src_host', 'dst_host')}
df = df.astype(known_types)
df['src_host'] = to_ipaddress(df['src_host'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the input df['src_host'] here? Are we parsing text, bytes or just copying binary data? Seems like there ought to be a way of making this not-slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, at this point src_host and dst_host are strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue is that the fastest way to build an IPArray is from a columar 64-bit aligned bytestring, whereas these seem to be record based. I'll look a bit deeper (after profiling things).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the input is not already an array of the right structure (e.g., with .view()), it may be useful here to build the new columns using empty arrays and fill them, perhaps in a tight numba loop.

That will be generally true for the rest of the dataframe too.

df['dst_host'] = to_ipaddress(df['dst_host'])
return df


class LiveStream(PacketStream):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
intake
pandas
cyberpandas
dask
libpcap
pcapy
6 changes: 3 additions & 3 deletions tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from intake.catalog import Catalog

from .utils import dataframe_has_required_columns
from .utils import assert_dataframe_has_required_columns


@pytest.fixture
Expand All @@ -20,7 +20,7 @@ def test_raw_http(catalog1):
assert metadata['npartitions'] == 1

df = src.read()
assert dataframe_has_required_columns(df, payload=True)
assert_dataframe_has_required_columns(df, payload=True)
assert len(df) == 43

src.close()
Expand All @@ -33,7 +33,7 @@ def test_tcp_http(catalog1):
assert metadata['npartitions'] == 1

df = src.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 41

src.close()
14 changes: 7 additions & 7 deletions tests/test_offline.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
from .utils import dataframe_has_required_columns
from .utils import assert_dataframe_has_required_columns


def test_offline_unfiltered(ping_stream):
df = ping_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96


def test_offline_filter_tcp(http_stream):
http_stream.set_filter("tcp")
df = http_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 41


def test_offline_filter_udp(http_stream):
http_stream.set_filter("udp")
df = http_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 2


def test_offline_filter_icmp(http_stream):
http_stream.set_filter("icmp")
df = http_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 0


def test_offline_limit(http_stream):
df = http_stream.to_dataframe(n=10)
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 10


def test_offline_filter_vlan(vlan_stream):
vlan_stream.set_filter("tcp")
df = vlan_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 18
12 changes: 6 additions & 6 deletions tests/test_source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from .utils import dataframe_has_required_columns
from .utils import assert_dataframe_has_required_columns


def test_unfiltered_source(ping_source):
metadata = ping_source.discover()
assert metadata['npartitions'] == 1

df = ping_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96

ping_source.close()
Expand All @@ -17,7 +17,7 @@ def test_filtered_source(tcp_http_source):
assert metadata['npartitions'] == 1

df = tcp_http_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 41

tcp_http_source.close()
Expand All @@ -28,7 +28,7 @@ def test_multiple_source(multiple_source):
assert metadata['npartitions'] == 3

df = multiple_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 157

multiple_source.close()
Expand All @@ -39,11 +39,11 @@ def test_repeated_reads(ping_source):
assert metadata['npartitions'] == 1

df = ping_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96

df = ping_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96

ping_source.close()
24 changes: 20 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
def dataframe_has_required_columns(df, payload):
columns = ['time', 'src_host', 'src_port', 'dst_host', 'dst_port', 'protocol']
import pandas as pd
import pandas.util.testing as tm


def assert_dataframe_has_required_columns(df, payload):
items = [
('time', 'datetime64[ns]'),
('src_host', 'ip'),
('src_port', 'u4'),
('dst_host', 'ip'),
('dst_port', 'u4'),
('protocol', object)
]

if payload:
columns.append("payload")
return set(df.columns) == set(columns)
items.append(("payload", 'object'))

names, types = zip(*items)

expected = pd.Series(types, index=names)
tm.assert_series_equal(df.dtypes, expected)