Skip to content

Commit

Permalink
Merge pull request #12 from TomAugspurger/cyberpandas
Browse files Browse the repository at this point in the history
Added cyberpandas support
  • Loading branch information
seibert authored Apr 25, 2018
2 parents 220d30a + 721c7e3 commit f0c4093
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 24 deletions.
1 change: 1 addition & 0 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ requirements:
- intake
- python
- pandas
- cyberpandas
- dask
- pcapy

Expand Down
2 changes: 1 addition & 1 deletion examples/dump-live.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pcapy

from intake_pcap import IPPacket
from intake_pcap.packet import IPPacket


if __name__ == '__main__':
Expand Down
15 changes: 12 additions & 3 deletions intake_pcap/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import namedtuple, OrderedDict

import pandas as pd
from cyberpandas import to_ipaddress

import pcapy

Expand All @@ -22,9 +23,9 @@ def __init__(self, reader, protocol, payload):
def dtype(self):
items = [
('time', 'datetime64[ns]'),
('src_host', 'object'),
('src_host', 'ip'),
('src_port', 'u4'),
('dst_host', 'object'),
('dst_host', 'ip'),
('dst_port', 'u4'),
('protocol', 'str')]

Expand Down Expand Up @@ -82,7 +83,15 @@ def decoder(header, data):

columns = FullPacket._fields if self._payload else BasePacket._fields
df = pd.DataFrame(packets, columns=columns)
return df.astype(dtype=self.dtype)

# DataFrame.astype doesn't work with extension types (yet).
# https://github.com/pandas-dev/pandas/issues/20557
known_types = {k: v for k, v in self.dtype.items()
if k not in ('src_host', 'dst_host')}
df = df.astype(known_types)
df['src_host'] = to_ipaddress(df['src_host'])
df['dst_host'] = to_ipaddress(df['dst_host'])
return df


class LiveStream(PacketStream):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
intake
pandas
cyberpandas
dask
libpcap
pcapy
6 changes: 3 additions & 3 deletions tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from intake.catalog import Catalog

from .utils import dataframe_has_required_columns
from .utils import assert_dataframe_has_required_columns


@pytest.fixture
Expand All @@ -20,7 +20,7 @@ def test_raw_http(catalog1):
assert metadata['npartitions'] == 1

df = src.read()
assert dataframe_has_required_columns(df, payload=True)
assert_dataframe_has_required_columns(df, payload=True)
assert len(df) == 43

src.close()
Expand All @@ -33,7 +33,7 @@ def test_tcp_http(catalog1):
assert metadata['npartitions'] == 1

df = src.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 41

src.close()
14 changes: 7 additions & 7 deletions tests/test_offline.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
from .utils import dataframe_has_required_columns
from .utils import assert_dataframe_has_required_columns


def test_offline_unfiltered(ping_stream):
df = ping_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96


def test_offline_filter_tcp(http_stream):
http_stream.set_filter("tcp")
df = http_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 41


def test_offline_filter_udp(http_stream):
http_stream.set_filter("udp")
df = http_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 2


def test_offline_filter_icmp(http_stream):
http_stream.set_filter("icmp")
df = http_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 0


def test_offline_limit(http_stream):
df = http_stream.to_dataframe(n=10)
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 10


def test_offline_filter_vlan(vlan_stream):
vlan_stream.set_filter("tcp")
df = vlan_stream.to_dataframe()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 18
12 changes: 6 additions & 6 deletions tests/test_source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from .utils import dataframe_has_required_columns
from .utils import assert_dataframe_has_required_columns


def test_unfiltered_source(ping_source):
metadata = ping_source.discover()
assert metadata['npartitions'] == 1

df = ping_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96

ping_source.close()
Expand All @@ -17,7 +17,7 @@ def test_filtered_source(tcp_http_source):
assert metadata['npartitions'] == 1

df = tcp_http_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 41

tcp_http_source.close()
Expand All @@ -28,7 +28,7 @@ def test_multiple_source(multiple_source):
assert metadata['npartitions'] == 3

df = multiple_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 157

multiple_source.close()
Expand All @@ -39,11 +39,11 @@ def test_repeated_reads(ping_source):
assert metadata['npartitions'] == 1

df = ping_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96

df = ping_source.read()
assert dataframe_has_required_columns(df, payload=False)
assert_dataframe_has_required_columns(df, payload=False)
assert len(df) == 96

ping_source.close()
24 changes: 20 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
def dataframe_has_required_columns(df, payload):
columns = ['time', 'src_host', 'src_port', 'dst_host', 'dst_port', 'protocol']
import pandas as pd
import pandas.util.testing as tm


def assert_dataframe_has_required_columns(df, payload):
items = [
('time', 'datetime64[ns]'),
('src_host', 'ip'),
('src_port', 'u4'),
('dst_host', 'ip'),
('dst_port', 'u4'),
('protocol', object)
]

if payload:
columns.append("payload")
return set(df.columns) == set(columns)
items.append(("payload", 'object'))

names, types = zip(*items)

expected = pd.Series(types, index=names)
tm.assert_series_equal(df.dtypes, expected)

0 comments on commit f0c4093

Please sign in to comment.