From c26650a990a0eb460a85d78ecf572b05232bfde8 Mon Sep 17 00:00:00 2001 From: josch Date: Thu, 26 Jun 2014 08:47:04 +0200 Subject: [PATCH] bulk commit --- aggr_operators.py | 46 ++--- allen_index.py | 26 +-- allen_ops.py | 54 +++--- filter.py | 1 - filter_validator.py | 6 +- ftreader.pyx | 351 +++++++++++++++++++++++++++++++++++++++ grouper.py | 15 +- grouper_validator.py | 24 +-- groupfilter.py | 1 - groupfilter_validator.py | 41 +++-- include/ftreader.h | 67 ++++++++ lib/ftreader.c | 325 ++++++++++++++++++++++++++++++++++++ merger_validator.py | 7 +- parser.py | 8 +- setup.py | 35 ++++ timeindex.py | 20 +-- ungrouper_validator.py | 5 +- validator_common.py | 17 +- 18 files changed, 904 insertions(+), 145 deletions(-) create mode 100644 ftreader.pyx create mode 100644 include/ftreader.h create mode 100644 lib/ftreader.c create mode 100644 setup.py diff --git a/aggr_operators.py b/aggr_operators.py index 8d198bb..1d013ea 100644 --- a/aggr_operators.py +++ b/aggr_operators.py @@ -1,15 +1,13 @@ import options -from tables import UInt32Col, UInt64Col if options.import_grouper_ops: external_import = __import__(options.import_grouper_ops) class last(object): - __slots__ = ['field', 'gr_field', 'field_type', 'last'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'last'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.last = None def __call__(self, record = None): @@ -21,11 +19,10 @@ class last(object): class sum(object): - __slots__ = ['field', 'gr_field', 'field_type','sum'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'sum'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.sum = 0 def __call__(self, record = None): @@ -36,11 +33,10 @@ class sum(object): return self.sum class avg(object): - __slots__ = ['field', 'gr_field', 'field_type','sum','n','avg'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'sum','n','avg'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.sum = 0 self.n = 0 self.avg = None @@ -58,11 +54,10 @@ class avg(object): return self.avg class max(object): - __slots__ = ['field', 'gr_field', 'field_type','max'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'max'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.max = float("-inf") def __call__(self, record = None): @@ -75,11 +70,10 @@ class max(object): return self.max class min(object): - __slots__ = ['field', 'gr_field', 'field_type','min'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'min'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.min = float("inf") def __call__(self, record = None): @@ -92,11 +86,10 @@ class min(object): return self.min class count(object): - __slots__ = ['field', 'gr_field', 'field_type','count'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'count'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.count = 0 def __call__(self, record = None): @@ -107,11 +100,10 @@ class count(object): return self.count class union(object): - __slots__ = ['field', 'gr_field', 'field_type','union'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'union'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.union = [] def __call__(self, record = None): @@ -122,11 +114,10 @@ class union(object): return self.union class bitAND(object): - __slots__ = ['field', 'gr_field', 'field_type','bitAND'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'bitAND'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.bitAND = pow(2,field_type.size) - 1 # all 1s for the given size def __call__(self, record = None): @@ -137,11 +128,10 @@ class bitAND(object): return self.bitAND class bitOR(object): - __slots__ = ['field', 'gr_field', 'field_type','bitOR'] - def __init__(self, field, gr_field, field_type): + __slots__ = ['field', 'gr_field', 'bitOR'] + def __init__(self, field, gr_field): self.field = field self.gr_field = gr_field - self.field_type = field_type self.bitOR = 0 def __call__(self, record = None): diff --git a/allen_index.py b/allen_index.py index cdc6993..0a08dcb 100644 --- a/allen_index.py +++ b/allen_index.py @@ -9,7 +9,7 @@ class LT(object): self.target = target def __call__(self, x): - return x.etime, x.etime + self.delta + return x.Last, x.Last + self.delta class GT(object): """ @@ -22,7 +22,7 @@ class GT(object): self.target = target def __call__(self, x): - return x.stime - self.delta, x.stime + return x.First - self.delta, x.stime class m(object): """ @@ -36,7 +36,7 @@ class m(object): self.target = target def __call__(self, x): - return x.etime, x.etime + self.delta + return x.Last, x.Last + self.delta class mi(object): """ @@ -50,7 +50,7 @@ class mi(object): self.target = target def __call__(self, x): - return x.stime - self.delta, x.stime + return x.First - self.delta, x.stime class o(object): """ @@ -64,7 +64,7 @@ class o(object): self.target = target def __call__(self, x): - return x.etime-self.delta, x.etime+self.delta + return x.Last-self.delta, x.Last+self.delta class oi(object): """ @@ -77,7 +77,7 @@ class oi(object): self.target = target def __call__(self, x): - return x.stime, x.stime + return x.First, x.stime class d(object): """ @@ -90,7 +90,7 @@ class d(object): self.target = target def __call__(self, x): - return x.stime, x.stime + return x.First, x.stime class di(object): """ @@ -103,7 +103,7 @@ class di(object): self.target = target def __call__(self, x): - return x.stime, x.etime + return x.First, x.Last class f(object): @@ -117,7 +117,7 @@ class f(object): self.target = target def __call__(self, x): - return x.etime - self.delta, x.etime + self.delta + return x.Last - self.delta, x.Last + self.delta class fi(object): """ @@ -130,7 +130,7 @@ class fi(object): self.target = target def __call__(self, x): - return x.etime - self.delta, x.etime + self.delta + return x.Last - self.delta, x.Last + self.delta class s(object): """ @@ -143,7 +143,7 @@ class s(object): self.target = target def __call__(self, x): - return x.stime - self.delta, x.stime + self.delta + return x.First - self.delta, x.stime + self.delta class si(object): """ @@ -156,7 +156,7 @@ class si(object): self.target = target def __call__(self, x): - return x.stime - self.delta, x.stime + self.delta + return x.First - self.delta, x.stime + self.delta class EQ(object): """ @@ -169,4 +169,4 @@ class EQ(object): self.target = target def __call__(self, x): - return x.stime - self.delta, x.stime + self.delta + return x.First - self.delta, x.stime + self.delta diff --git a/allen_ops.py b/allen_ops.py index 686d15b..5e6c00b 100644 --- a/allen_ops.py +++ b/allen_ops.py @@ -27,14 +27,14 @@ class AllenOpIndex(object): X < Y x before y """ - return x.etime, x.etime + delta + return x.Last, x.Last + delta def GT(self, x, delta): """ X > Y x after y """ - return x.stime - delta, x.stime + return x.First - delta, x.stime def m(self, x, delta=1): """ @@ -42,7 +42,7 @@ class AllenOpIndex(object): x meets y (x starts before y) y should occur at end of x """ - return x.etime, x.etime + delta + return x.Last, x.Last + delta def mi(self, x, delta=1): """ @@ -50,7 +50,7 @@ class AllenOpIndex(object): inverse x meets y (x starts after y) y should occur at the beginning of x """ - return x.stime - delta, x.stime + return x.First - delta, x.stime def o(self, x, delta=1): """ @@ -58,28 +58,28 @@ class AllenOpIndex(object): x overlaps y (x starts before y) y should occur at the end of x """ - return x.etime-delta, x.etime+delta + return x.Last-delta, x.Last+delta def oi(self, x, delta=1): """ X oi Y inverse x overlaps y (x starts after y) """ - return x.stime, x.stime + return x.First, x.stime def d(self, x, delta=0): """ X d Y x during y """ - return x.stime, x.stime + return x.First, x.stime def di(self, x, delta=0): """ X di Y inverse x during y (y during x) """ - return x.stime, x.etime + return x.First, x.Last def f(self, x, delta=1): @@ -88,21 +88,21 @@ class AllenOpIndex(object): x finishes y (x starts after y, x and y end together) """ # delta disregarded here - return x.etime - delta, x.etime + delta + return x.Last - delta, x.Last + delta def fi(self, x, delta=1): """ X fi Y inverse x finishes y (x is finished by y) """ - return x.etime - delta, x.etime + delta + return x.Last - delta, x.Last + delta def s(self, x, delta=1): """ X s Y x starts y (x ends before y, x and y starts together) """ - return x.stime - delta, x.stime + delta + return x.First - delta, x.stime + delta def si(self, x, delta=1): """ @@ -110,7 +110,7 @@ class AllenOpIndex(object): inverse x starts y (x is started by y) """ # delta disregarded here - return x.stime - delta, x.stime + delta + return x.First - delta, x.stime + delta def EQ(self, x, delta=1): """ @@ -118,8 +118,8 @@ class AllenOpIndex(object): X lasts the same time as Y """ # delta disregarded here - return int((x.stime + x.etime)/2) - delta, int((x.stime + - x.etime)/2) + delta + return int((x.First + x.Last)/2) - delta, int((x.stime + + x.Last)/2) + delta def composite_intervals(self, op_x_delta_tuples): intervals = set() @@ -138,14 +138,14 @@ def LT(x, y, delta=0): X < Y x before y """ - return x.etime < y.stime + return x.Last < y.First def GT(x, y, delta=1): """ X > Y x after y """ - return x.stime > y.etime + return x.First > y.Last def m(x, y, delta=1): """ @@ -153,7 +153,7 @@ def m(x, y, delta=1): x meets y (x starts before y) y should occur at end of x """ - return abs(x.etime - y.stime) < delta + return abs(x.Last - y.First) < delta def mi(x, y, delta=1): """ @@ -161,7 +161,7 @@ def mi(x, y, delta=1): inverse x meets y (x starts after y) y should occur at the beginning of x """ - return abs(x.stime - y.etime) < delta + return abs(x.First - y.Last) < delta def o(x, y, delta=1): """ @@ -169,28 +169,28 @@ def o(x, y, delta=1): x overlaps y (x starts before y) y should occur at the end of x """ - return y.stime < x.etime < y.etime + return y.First < x.Last < y.Last def oi(x, y, delta=1): """ X oi Y inverse x overlaps y (x starts after y) """ - return y.stime < x.stime < y.etime + return y.First < x.stime < y.Last def d(x, y, delta=0): """ X d Y x during y """ - return y.stime < x.stime and x.etime < y.etime + return y.First < x.stime and x.Last < y.Last def di(x, y, delta=0): """ X di Y inverse x during y (y during x) """ - return y.stime > x.stime and x.etime > y.etime + return y.First > x.stime and x.Last > y.Last def f(x, y, delta=1): @@ -199,21 +199,21 @@ def f(x, y, delta=1): x finishes y (x starts after y, x and y end together) """ # delta disregarded here - return x.stime > y.etime and abs(x.etime - y.etime) < delta + return x.First > y.Last and abs(x.Last - y.Last) < delta def fi(x, y, delta=1): """ X fi Y inverse x finishes y (x is finished by y) """ - return x.stime < y.etime and abs(x.etime - y.etime) < delta + return x.First < y.Last and abs(x.Last - y.Last) < delta def s(x, y, delta=1): """ X s Y x starts y (x ends before y, x and y start together) """ - return x.etime < y.etime and abs(x.stime - y.stime) < delta + return x.Last < y.Last and abs(x.First - y.stime) < delta def si(x, y, delta=1): """ @@ -221,7 +221,7 @@ def si(x, y, delta=1): inverse x starts y (x is started by y) """ # delta disregarded here - return x.etime > y.etime and abs(x.stime - y.stime) < delta + return x.Last > y.Last and abs(x.First - y.stime) < delta def EQ(x, y, delta=1): """ @@ -229,4 +229,4 @@ def EQ(x, y, delta=1): inverse x finishes y (x is finished by y) """ # delta disregarded here - return abs(x.stime - y.stime) < delta and abs(x.etime - y.etime) < delta + return abs(x.First - y.stime) < delta and abs(x.Last - y.Last) < delta diff --git a/filter.py b/filter.py index 1082688..204b80c 100644 --- a/filter.py +++ b/filter.py @@ -1,7 +1,6 @@ from copy import deepcopy from copy import copy from statement import Field -from record import RecordReader import time import profiler diff --git a/filter_validator.py b/filter_validator.py index fe7b76f..242da05 100644 --- a/filter_validator.py +++ b/filter_validator.py @@ -1,6 +1,5 @@ from validator_common import * from copy import deepcopy -from record import RecordReader from statement import FilterRef from filter import Rule as RuleImpl from filter import Filter as FilterImpl @@ -17,11 +16,10 @@ class FilterValidator(object): # get_input_reader()comes from validator_common.py, takes parsed query # as an input and returns a reader for the parser's input - a reader # object for an HDF table of flow records - self.fields = get_input_fields_types(get_input_reader(self.parser)).keys() self.pseudo_branches = {} # Argument is a reader object that has an access to the description of the # stored records, and can create a list of available fields - self.input_reader = RecordReader(get_input_reader(parser)) + self.input_reader = get_input_reader(parser) self.impl = self.create_impl() def check_for_unused_filters(self): @@ -49,7 +47,7 @@ class FilterValidator(object): "Check record field references, for unknown fields" for filter in self.filters: for rule in iterate_rules(filter): - check_rule_fields(rule, self.fields) + check_rule_fields(rule, get_input_reader(self.parser)) def change_branch_names_to_id(self): """ diff --git a/ftreader.pyx b/ftreader.pyx new file mode 100644 index 0000000..db88969 --- /dev/null +++ b/ftreader.pyx @@ -0,0 +1,351 @@ +#cimport libc.stdio +#cimport posix.fcntl + +cdef extern from "ftlib.h": + ctypedef unsigned long long u_int64 + ctypedef unsigned int u_int32 + ctypedef unsigned short u_int16 + ctypedef unsigned char u_int8 + + struct fts3rec_all: + u_int32 *unix_secs + u_int32 *unix_nsecs + u_int32 *sysUpTime + u_int32 *exaddr + u_int32 *srcaddr + u_int32 *dstaddr + u_int32 *nexthop + u_int16 *input + u_int16 *output + u_int32 *dFlows + u_int32 *dPkts + u_int32 *dOctets + u_int32 *First + u_int32 *Last + u_int16 *srcport + u_int16 *dstport + u_int8 *prot + u_int8 *tos + u_int8 *tcp_flags + u_int8 *engine_type + u_int8 *engine_id + u_int8 *src_mask + u_int8 *dst_mask + u_int16 *src_as + u_int16 *dst_as + u_int8 *in_encaps + u_int8 *out_encaps + u_int32 *peer_nexthop + u_int32 *router_sc + u_int32 *src_tag + u_int32 *dst_tag + u_int32 *extra_pkts + u_int8 *marked_tos + + struct ftio: + pass + + struct fts3rec_offsets: + pass + + struct ftver: + pass + + cdef enum ft_xfields: + FT_XFIELD_UNIX_SECS = 0x0000000000000001LL + FT_XFIELD_UNIX_NSECS = 0x0000000000000002LL + FT_XFIELD_SYSUPTIME = 0x0000000000000004LL + FT_XFIELD_EXADDR = 0x0000000000000008LL + FT_XFIELD_DFLOWS = 0x0000000000000010LL + FT_XFIELD_DPKTS = 0x0000000000000020LL + FT_XFIELD_DOCTETS = 0x0000000000000040LL + FT_XFIELD_FIRST = 0x0000000000000080LL + FT_XFIELD_LAST = 0x0000000000000100LL + FT_XFIELD_ENGINE_TYPE = 0x0000000000000200LL + FT_XFIELD_ENGINE_ID = 0x0000000000000400LL + FT_XFIELD_SRCADDR = 0x0000000000001000LL + FT_XFIELD_DSTADDR = 0x0000000000002000LL + FT_XFIELD_NEXTHOP = 0x0000000000010000LL + FT_XFIELD_INPUT = 0x0000000000020000LL + FT_XFIELD_OUTPUT = 0x0000000000040000LL + FT_XFIELD_SRCPORT = 0x0000000000080000LL + FT_XFIELD_DSTPORT = 0x0000000000100000LL + FT_XFIELD_PROT = 0x0000000000200000LL + FT_XFIELD_TOS = 0x0000000000400000LL + FT_XFIELD_TCP_FLAGS = 0x0000000000800000LL + FT_XFIELD_SRC_MASK = 0x0000000001000000LL + FT_XFIELD_DST_MASK = 0x0000000002000000LL + FT_XFIELD_SRC_AS = 0x0000000004000000LL + FT_XFIELD_DST_AS = 0x0000000008000000LL + FT_XFIELD_IN_ENCAPS = 0x0000000010000000LL + FT_XFIELD_OUT_ENCAPS = 0x0000000020000000LL + FT_XFIELD_PEER_NEXTHOP = 0x0000000040000000LL + FT_XFIELD_ROUTER_SC = 0x0000000080000000LL + FT_XFIELD_EXTRA_PKTS = 0x0000000100000000LL + FT_XFIELD_MARKED_TOS = 0x0000000200000000LL + FT_XFIELD_SRC_TAG = 0x0000000400000000LL + FT_XFIELD_DST_TAG = 0x0000000800000000LL + +cdef extern from "include/ftreader.h": + struct ft_data: + int fd + ftio io + fts3rec_offsets offsets + ftver version + u_int64 xfield + int rec_size + char **records + int numrecords + + ft_data *ft_open(char *filename) + void ft_write(ft_data *data, char *filename) + void ft_records_get_all(ft_data* data, int number, fts3rec_all *record) + u_int32 *ft_records_get_unix_secs(ft_data* data, int number) + u_int32 *ft_records_get_unix_nsecs(ft_data* data, int number) + u_int32 *ft_records_get_sysUpTime(ft_data* data, int number) + u_int32 *ft_records_get_exaddr(ft_data* data, int number) + u_int32 *ft_records_get_srcaddr(ft_data* data, int number) + u_int32 *ft_records_get_dstaddr(ft_data* data, int number) + u_int32 *ft_records_get_nexthop(ft_data* data, int number) + u_int16 *ft_records_get_input(ft_data* data, int number) + u_int16 *ft_records_get_output(ft_data* data, int number) + u_int32 *ft_records_get_dFlows(ft_data* data, int number) + u_int32 *ft_records_get_dPkts(ft_data* data, int number) + u_int32 *ft_records_get_dOctets(ft_data* data, int number) + u_int32 *ft_records_get_First(ft_data* data, int number) + u_int32 *ft_records_get_Last(ft_data* data, int number) + u_int16 *ft_records_get_srcport(ft_data* data, int number) + u_int16 *ft_records_get_dstport(ft_data* data, int number) + u_int8 *ft_records_get_prot(ft_data* data, int number) + u_int8 *ft_records_get_tos(ft_data* data, int number) + u_int8 *ft_records_get_tcp_flags(ft_data* data, int number) + u_int8 *ft_records_get_engine_type(ft_data* data, int number) + u_int8 *ft_records_get_engine_id(ft_data* data, int number) + u_int8 *ft_records_get_src_mask(ft_data* data, int number) + u_int8 *ft_records_get_dst_mask(ft_data* data, int number) + u_int16 *ft_records_get_src_as(ft_data* data, int number) + u_int16 *ft_records_get_dst_as(ft_data* data, int number) + u_int8 *ft_records_get_in_encaps(ft_data* data, int number) + u_int8 *ft_records_get_out_encaps(ft_data* data, int number) + u_int32 *ft_records_get_peer_nexthop(ft_data* data, int number) + u_int32 *ft_records_get_router_sc(ft_data* data, int number) + u_int32 *ft_records_get_src_tag(ft_data* data, int number) + u_int32 *ft_records_get_dst_tag(ft_data* data, int number) + u_int32 *ft_records_get_extra_pkts(ft_data* data, int number) + u_int8 *ft_records_get_marked_tos(ft_data* data, int number) + +cdef class FtReader: + cdef ft_data *data + + def __init__(self, filename): + self.data = ft_open(filename) + + def get_numrecords(self): + return self.data.numrecords + + def supports_attr(self, attr): + if attr == "unix_secs": + return bool(self.data.xfield & FT_XFIELD_UNIX_SECS) + elif attr == "unix_nsecs": + return bool(self.data.xfield & FT_XFIELD_UNIX_NSECS) + elif attr == "sysUpTime": + return bool(self.data.xfield & FT_XFIELD_SYSUPTIME) + elif attr == "exaddr": + return bool(self.data.xfield & FT_XFIELD_EXADDR) + elif attr == "srcaddr": + return bool(self.data.xfield & FT_XFIELD_SRCADDR) + elif attr == "dstaddr": + return bool(self.data.xfield & FT_XFIELD_DSTADDR) + elif attr == "nexthop": + return bool(self.data.xfield & FT_XFIELD_NEXTHOP) + elif attr == "input": + return bool(self.data.xfield & FT_XFIELD_INPUT) + elif attr == "output": + return bool(self.data.xfield & FT_XFIELD_OUTPUT) + elif attr == "dFlows": + return bool(self.data.xfield & FT_XFIELD_DFLOWS) + elif attr == "dPkts": + return bool(self.data.xfield & FT_XFIELD_DPKTS) + elif attr == "dOctets": + return bool(self.data.xfield & FT_XFIELD_DOCTETS) + elif attr == "First": + return bool(self.data.xfield & FT_XFIELD_FIRST) + elif attr == "Last": + return bool(self.data.xfield & FT_XFIELD_LAST) + elif attr == "srcport": + return bool(self.data.xfield & FT_XFIELD_SRCPORT) + elif attr == "dstport": + return bool(self.data.xfield & FT_XFIELD_DSTPORT) + elif attr == "prot": + return bool(self.data.xfield & FT_XFIELD_PROT) + elif attr == "tos": + return bool(self.data.xfield & FT_XFIELD_TOS) + elif attr == "tcp_flags": + return bool(self.data.xfield & FT_XFIELD_TCP_FLAGS) + elif attr == "engine_type": + return bool(self.data.xfield & FT_XFIELD_ENGINE_TYPE) + elif attr == "engine_id": + return bool(self.data.xfield & FT_XFIELD_ENGINE_ID) + elif attr == "src_mask": + return bool(self.data.xfield & FT_XFIELD_SRC_MASK) + elif attr == "dst_mask": + return bool(self.data.xfield & FT_XFIELD_DST_MASK) + elif attr == "src_as": + return bool(self.data.xfield & FT_XFIELD_SRC_AS) + elif attr == "dst_as": + return bool(self.data.xfield & FT_XFIELD_DST_AS) + elif attr == "in_encaps": + return bool(self.data.xfield & FT_XFIELD_IN_ENCAPS) + elif attr == "out_encaps": + return bool(self.data.xfield & FT_XFIELD_OUT_ENCAPS) + elif attr == "peer_nexthop": + return bool(self.data.xfield & FT_XFIELD_PEER_NEXTHOP) + elif attr == "router_sc": + return bool(self.data.xfield & FT_XFIELD_ROUTER_SC) + elif attr == "src_tag": + return bool(self.data.xfield & FT_XFIELD_SRC_TAG) + elif attr == "dst_tag": + return bool(self.data.xfield & FT_XFIELD_DST_TAG) + elif attr == "extra_pkts": + return bool(self.data.xfield & FT_XFIELD_EXTRA_PKTS) + elif attr == "marked_tos": + return bool(self.data.xfield & FT_XFIELD_MARKED_TOS) + else: + return False + + def get_record(self, num): + cdef fts3rec_all record + ft_records_get_all(self.data, num, &record) + return (record.unix_secs[0], + record.unix_nsecs[0], + record.sysUpTime[0], + record.exaddr[0], + record.srcaddr[0], + record.dstaddr[0], + record.nexthop[0], + record.input[0], + record.output[0], + record.dFlows[0], + record.dPkts[0], + record.dOctets[0], + record.First[0], + record.Last[0], + record.srcport[0], + record.dstport[0], + record.prot[0], + record.tos[0], + record.tcp_flags[0], + record.engine_type[0], + record.engine_id[0], + record.src_mask[0], + record.dst_mask[0], + record.src_as[0], + record.dst_as[0], + record.in_encaps[0], + record.out_encaps[0], + record.peer_nexthop[0], + record.router_sc[0], + record.src_tag[0], + record.dst_tag[0], + record.extra_pkts[0], + record.marked_tos[0] + ) + + def get_unix_secs(self, num): + return ft_records_get_unix_secs(self.data, num)[0] + + def get_unix_nsecs(self, num): + return ft_records_get_unix_nsecs(self.data, num)[0] + + def get_sysUpTime(self, num): + return ft_records_get_sysUpTime(self.data, num)[0] + + def get_exaddr(self, num): + return ft_records_get_exaddr(self.data, num)[0] + + def get_srcaddr(self, num): + return ft_records_get_srcaddr(self.data, num)[0] + + def get_dstaddr(self, num): + return ft_records_get_dstaddr(self.data, num)[0] + + def get_nexthop(self, num): + return ft_records_get_nexthop(self.data, num)[0] + + def get_input(self, num): + return ft_records_get_input(self.data, num)[0] + + def get_output(self, num): + return ft_records_get_output(self.data, num)[0] + + def get_dFlows(self, num): + return ft_records_get_dFlows(self.data, num)[0] + + def get_dPkts(self, num): + return ft_records_get_dPkts(self.data, num)[0] + + def get_dOctets(self, num): + return ft_records_get_dOctets(self.data, num)[0] + + def get_First(self, num): + return ft_records_get_First(self.data, num)[0] + + def get_Last(self, num): + return ft_records_get_Last(self.data, num)[0] + + def get_srcport(self, num): + return ft_records_get_srcport(self.data, num)[0] + + def get_dstport(self, num): + return ft_records_get_dstport(self.data, num)[0] + + def get_prot(self, num): + return ft_records_get_prot(self.data, num)[0] + + def get_tos(self, num): + return ft_records_get_tos(self.data, num)[0] + + def get_tcp_flags(self, num): + return ft_records_get_tcp_flags(self.data, num)[0] + + def get_engine_type(self, num): + return ft_records_get_engine_type(self.data, num)[0] + + def get_engine_id(self, num): + return ft_records_get_engine_id(self.data, num)[0] + + def get_src_mask(self, num): + return ft_records_get_src_mask(self.data, num)[0] + + def get_dst_mask(self, num): + return ft_records_get_dst_mask(self.data, num)[0] + + def get_src_as(self, num): + return ft_records_get_src_as(self.data, num)[0] + + def get_dst_as(self, num): + return ft_records_get_dst_as(self.data, num)[0] + + def get_in_encaps(self, num): + return ft_records_get_in_encaps(self.data, num)[0] + + def get_out_encaps(self, num): + return ft_records_get_out_encaps(self.data, num)[0] + + def get_peer_nexthop(self, num): + return ft_records_get_peer_nexthop(self.data, num)[0] + + def get_router_sc(self, num): + return ft_records_get_router_sc(self.data, num)[0] + + def get_src_tag(self, num): + return ft_records_get_src_tag(self.data, num)[0] + + def get_dst_tag(self, num): + return ft_records_get_dst_tag(self.data, num)[0] + + def get_extra_pkts(self, num): + return ft_records_get_extra_pkts(self.data, num)[0] + + def get_marked_tos(self, num): + return ft_records_get_marked_tos(self.data, num)[0] diff --git a/grouper.py b/grouper.py index f2156d5..32809f1 100644 --- a/grouper.py +++ b/grouper.py @@ -1,4 +1,3 @@ -import record import options from aggr_operators import count import time @@ -14,7 +13,8 @@ class Grouper(object): self.aggr_ops = aggr_ops self.group_record_fields = self.create_gr_record_fields_list() self.group_record_fields = ('rec_id',) + self.group_record_fields - self.group_record_types = self.create_gr_record_fields_types() + # TODO: blabla + #self.group_record_types = self.create_gr_record_fields_types() self.group_records = [] self.branch_name = branch_name self.Record = record.get_record_class(self.group_record_fields) @@ -107,14 +107,13 @@ class Grouper(object): return tuple(type_list) class AggrOp(object): - def __init__(self, op, field, gr_field, field_type): + def __init__(self, op, field, gr_field): self.op = op self.field = field self.gr_field = gr_field # field name used for the grouping of a set of common entries - self.field_type = field_type def new_op(self): - return self.op(self.field, self.gr_field, self.field_type) + return self.op(self.field, self.gr_field) class GrouperModule(object): def __init__(self, name, rules, aggr_ops): @@ -141,8 +140,8 @@ class GrouperRule(object): def check_is_shortcut(self): if self.delta: - if (self.old_rec_field in ('stime', 'etime') and - self.new_rec_field in ('stime', 'etime')): + if (self.old_rec_field in ('First', 'Last') and + self.new_rec_field in ('First', 'Last')): return True return False @@ -216,7 +215,7 @@ class Group(object): if matched: # self.aggr_ops contains the fields from the aggregation statement of the grouper module - # as well as 3 other implicitly stated aggregation operations (etime, stime, records...) + # as well as 3 other implicitly stated aggregation operations (Last, First, records...) for aggr_op in self.aggr_ops: aggr_op(record) # print aggr_op.gr_field, aggr_op() diff --git a/grouper_validator.py b/grouper_validator.py index e928ce3..a5c840b 100644 --- a/grouper_validator.py +++ b/grouper_validator.py @@ -1,6 +1,5 @@ from validator_common import * from copy import deepcopy -from tables import UIntAtom, UIntCol from grouper import GrouperModule as GrouperModuleImpl from grouper import Grouper as GrouperImpl from grouper import GrouperRule as GrouperRuleImpl @@ -10,8 +9,6 @@ import profiler class GrouperValidator(object): def __init__(self, parser, splitter_validator): self.parser = parser - self.fields_types = get_input_fields_types( - get_input_reader(self.parser)) self.groupers = deepcopy(parser.groupers) # print splitter_validator.br_name_to_br self.br_name_to_br = splitter_validator.br_name_to_br @@ -66,7 +63,7 @@ class GrouperValidator(object): # Checks if the rule names of modules match those that were established # from the flow records (passed as a second argument here). Defined in # validator_common - check_rule_fields(rule[0], self.fields_types.keys()) + check_rule_fields(rule[0], get_input_reader(self.parser)) # This section checks the correctness of the field names passed to the aggregator # section of the grouper stage. field_types are defined in init and are also @@ -76,12 +73,14 @@ class GrouperValidator(object): if type(arg) == Field: mod, _, field = arg.name.partition('.') if field != '': - if field not in self.fields_types.keys(): + if not get_input_reader(self.parser).supports_attr(field) \ + and field != "rec_id": msg = 'There is no such field %s, '%arg.name msg += 'referenced at line %s'%aggr.line raise SyntaxError(msg) else: - if mod not in self.fields_types.keys(): + if not get_input_reader(self.parser).supports_attr(mod) \ + and mod != "rec_id": msg = 'There is no such field %s, '%arg.name msg += 'referenced at line %s'%aggr.line raise SyntaxError(msg) @@ -122,12 +121,11 @@ class GrouperValidator(object): aggr_ops_list = [] del_list = [] for aggr in grouper.aggr: - op, field, gr_field, field_type = self.create_aggr_impl_init_args( - aggr) + op, field, gr_field = self.create_aggr_impl_init_args(aggr) mod_name, _, f = str.partition(field, '.') if f != '': if module.name == mod_name: - impl = AggrOpImpl(op, f, gr_field, field_type) + impl = AggrOpImpl(op, f, gr_field) aggr_ops_list.append(impl) del_list.append(aggr) @@ -143,16 +141,10 @@ class GrouperValidator(object): else: non_qid_field = field gr_field = aggr.args[1] - if aggr.op == 'count': - field_type = UIntCol(self.fields_types['rec_id'].itemsize) - elif aggr.op == 'union': - field_type = UIntAtom(self.fields_types[non_qid_field].itemsize) - else: - field_type = UIntCol(self.fields_types[non_qid_field].itemsize) op = find_op(aggr, 'aggr_operators') - return op, field, gr_field, field_type + return op, field, gr_field def convert_module_rules(self, module): rule_impl_list = [] diff --git a/groupfilter.py b/groupfilter.py index ac6e7cf..205fd57 100644 --- a/groupfilter.py +++ b/groupfilter.py @@ -1,4 +1,3 @@ -from record import RecordReader from filter import Rule import profiler diff --git a/groupfilter_validator.py b/groupfilter_validator.py index d95fe7b..17e1c2a 100644 --- a/groupfilter_validator.py +++ b/groupfilter_validator.py @@ -5,7 +5,6 @@ from groupfilter import Rule as RuleImpl from groupfilter import GroupFilter as GroupFilterImpl from groupfilter import AcceptGroupFilter as AcceptGroupFilterImpl from operators import NOT -import pytables from timeindex import TimeIndex import time @@ -39,7 +38,15 @@ class GroupFilterValidator(object): for filter in self.filters: for rule in iterate_rules(filter): for branch in filter.branches: - check_rule_fields(rule, self.branches_fields[branch]) + for arg in rule.args: + if type(arg) is Field: + if arg.name in self.branches_fields[branch]: + continue + else: + msg = 'There is no such field %s, '%arg.name + msg += 'referenced at line %s'%rule.line + raise SyntaxError(msg) + def get_branches_fields(self): @@ -96,14 +103,15 @@ class GroupFilterValidator(object): records = self.br_name_to_grouper[br_name] index = TimeIndex(5000) grouper = records - field_types = dict(zip(grouper.group_record_fields, - grouper.group_record_types)) + # TODO: dont use pytables here + #field_types = dict(zip(grouper.group_record_fields, + # grouper.group_record_types)) # print records - fname = options.temp_path + options.groups_file_prefix - fname += br_name+".h5" - if options.delete_temp_files: if_exists_delete(fname) - file = pytables.create_table_file(fname, field_types) - groups_table = pytables.FlowRecordsTable(fname) # Create separate table files for each of the branches + #fname = options.temp_path + options.groups_file_prefix + #fname += br_name+".h5" + #if options.delete_temp_files: if_exists_delete(fname) + #file = pytables.create_table_file(fname, field_types) + #groups_table = pytables.FlowRecordsTable(fname) # Create separate table files for each of the branches filt_impl = GroupFilterImpl(rules_impl, records, br_name, groups_table, index) group_filters_impl.append(filt_impl) @@ -120,13 +128,14 @@ class GroupFilterValidator(object): records = self.br_name_to_grouper[br_name] index = TimeIndex(5000) grouper = records - field_types = dict(zip(grouper.group_record_fields, - grouper.group_record_types)) - fname = options.temp_path + options.groups_file_prefix - fname += br_name+".h5" - if options.delete_temp_files: if_exists_delete(fname) - file = pytables.create_table_file(fname, field_types) - groups_table = pytables.FlowRecordsTable(fname) + # TODO: dont use pytables here + #field_types = dict(zip(grouper.group_record_fields, + # grouper.group_record_types)) + #fname = options.temp_path + options.groups_file_prefix + #fname += br_name+".h5" + #if options.delete_temp_files: if_exists_delete(fname) + #file = pytables.create_table_file(fname, field_types) + #groups_table = pytables.FlowRecordsTable(fname) filt_impl = AcceptGroupFilterImpl(records, br_name, groups_table, index) # This class is called in case some branch is missing # the definition of a group-filter. Essentially a plain diff --git a/include/ftreader.h b/include/ftreader.h new file mode 100644 index 0000000..71da50f --- /dev/null +++ b/include/ftreader.h @@ -0,0 +1,67 @@ +#include + +#ifndef __FTREADER_H +#define __FTREADER_H + +/* +struct ft_data_offsets { + const char *name; + size_t offset; + size_t size; + u_int64 xfield; +}; +*/ + +struct ft_data { + int fd; + struct ftio io; + struct fts3rec_offsets offsets; + struct ftver version; + u_int64 xfield; + int rec_size; + char **records; + int numrecords; +}; + +struct ft_data *ft_open(char *filename); +void ft_write(struct ft_data *data, char *filename); + +void ft_records_get_all(struct ft_data* data, int number, struct fts3rec_all *record); + +u_int32 *ft_records_get_unix_secs(struct ft_data* data, int number); +u_int32 *ft_records_get_unix_nsecs(struct ft_data* data, int number); +u_int32 *ft_records_get_sysUpTime(struct ft_data* data, int number); +u_int32 *ft_records_get_exaddr(struct ft_data* data, int number); +u_int32 *ft_records_get_srcaddr(struct ft_data* data, int number); +u_int32 *ft_records_get_dstaddr(struct ft_data* data, int number); +u_int32 *ft_records_get_nexthop(struct ft_data* data, int number); +u_int16 *ft_records_get_input(struct ft_data* data, int number); +u_int16 *ft_records_get_output(struct ft_data* data, int number); +u_int32 *ft_records_get_dFlows(struct ft_data* data, int number); +u_int32 *ft_records_get_dPkts(struct ft_data* data, int number); +u_int32 *ft_records_get_dOctets(struct ft_data* data, int number); +u_int32 *ft_records_get_First(struct ft_data* data, int number); +u_int32 *ft_records_get_Last(struct ft_data* data, int number); +u_int16 *ft_records_get_srcport(struct ft_data* data, int number); +u_int16 *ft_records_get_dstport(struct ft_data* data, int number); +u_int8 *ft_records_get_prot(struct ft_data* data, int number); +u_int8 *ft_records_get_tos(struct ft_data* data, int number); +u_int8 *ft_records_get_tcp_flags(struct ft_data* data, int number); +u_int8 *ft_records_get_engine_type(struct ft_data* data, int number); +u_int8 *ft_records_get_engine_id(struct ft_data* data, int number); +u_int8 *ft_records_get_src_mask(struct ft_data* data, int number); +u_int8 *ft_records_get_dst_mask(struct ft_data* data, int number); +u_int16 *ft_records_get_src_as(struct ft_data* data, int number); +u_int16 *ft_records_get_dst_as(struct ft_data* data, int number); +u_int8 *ft_records_get_in_encaps(struct ft_data* data, int number); +u_int8 *ft_records_get_out_encaps(struct ft_data* data, int number); +u_int32 *ft_records_get_peer_nexthop(struct ft_data* data, int number); +u_int32 *ft_records_get_router_sc(struct ft_data* data, int number); +u_int32 *ft_records_get_src_tag(struct ft_data* data, int number); +u_int32 *ft_records_get_dst_tag(struct ft_data* data, int number); +u_int32 *ft_records_get_extra_pkts(struct ft_data* data, int number); +u_int8 *ft_records_get_marked_tos(struct ft_data* data, int number); + +void ft_close(struct ft_data* data); + +#endif diff --git a/lib/ftreader.c b/lib/ftreader.c new file mode 100644 index 0000000..4f51257 --- /dev/null +++ b/lib/ftreader.c @@ -0,0 +1,325 @@ +#include +#include +#include +#include +#include + +#include "ftreader.h" + +struct ft_data *ft_open(char *filename) +{ + int ret; + struct ft_data *data; + char *record; + + data = (struct ft_data *)calloc(1, sizeof(struct ft_data)); + + data->fd = STDIN_FILENO; + + if (filename && strcmp(filename, "-") != 0) { + data->fd = open(filename, O_RDONLY); + if (data->fd == -1) { + perror("could not open file"); + return NULL; + } + } + + ret = ftio_init(&data->io, data->fd, FT_IO_FLAG_READ); + if (ret < 0) { + perror("ftio_init failed"); + return NULL; + } + + ftio_get_ver(&data->io, &data->version); + data->xfield = ftio_xfield(&data->io); + fts3rec_compute_offsets(&data->offsets, &data->version); + data->rec_size = ftio_rec_size(&data->io); + + /* + * TODO: optimize the reallocs here (eg by doubling the space every time + * one runs out of it) + * + * TODO: maybe allocate everything in one big chunk for faster iteration + * + */ + + while ((record = ftio_read(&data->io)) != NULL) { + data->numrecords++; + data->records = (char **)realloc(data->records, sizeof(char *)*data->numrecords); + data->records[data->numrecords-1] = (char *)malloc(data->rec_size); + memcpy(data->records[data->numrecords-1], record, data->rec_size); + } + + return data; +} + +void ft_write(struct ft_data *data, char *filename) +{ + struct ftset ftset; + int outfd; + int ret, i; + struct ftio ftio_out; + + outfd = STDOUT_FILENO; + + ftset_init(&ftset, 0); + + ftset.comments = ftio_get_comment(&data->io); // TODO: make configureable + ftset.byte_order = FT_HEADER_LITTLE_ENDIAN; // TODO: make configureable + ftset.z_level = 6; // from 0-9 TODO: make configureable + + if (filename && strcmp(filename, "-") != 0) { + outfd = open(filename, O_WRONLY|O_CREAT|O_TRUNC, 0644); + if (outfd == -1) { + } + } + + ret = ftio_init(&ftio_out, outfd, FT_IO_FLAG_WRITE | ((ftset.z_level) ? FT_IO_FLAG_ZINIT : 0)); + if (ret < 0) { + perror("ftio_init() failed"); + return; + } + + ftio_set_byte_order(&ftio_out, ftset.byte_order); + ftio_set_z_level(&ftio_out, ftset.z_level); + ftio_set_streaming(&ftio_out, 0); + ftio_set_debug(&ftio_out, 0); // TODO: make configureable + + ftio_set_preloaded(&ftio_out, 1); + ftio_set_cap_time(&ftio_out, ftio_get_cap_start(&data->io), ftio_get_cap_end(&data->io)); + ftio_set_flows_count(&ftio_out, data->numrecords); + ftio_set_corrupt(&ftio_out, ftio_get_corrupt(&data->io)); + ftio_set_lost(&ftio_out, ftio_get_lost(&data->io)); + + ret = ftio_set_comment(&ftio_out, ftset.comments); + ret = ftio_set_ver(&ftio_out, &data->version); + ret = ftio_write_header(&ftio_out); + + for (i = 0; i < data->numrecords; i++) { + ret = ftio_write(&ftio_out, data->records[i]); + } + + ret = ftio_close(&ftio_out); + close(outfd); +} + +void ft_records_get_all(struct ft_data* data, int number, struct fts3rec_all *record) +{ + record->unix_secs = ft_records_get_unix_secs(data, number); + record->unix_nsecs = ft_records_get_unix_nsecs(data, number); + record->sysUpTime = ft_records_get_sysUpTime(data, number); + record->exaddr = ft_records_get_exaddr(data, number); + record->srcaddr = ft_records_get_srcaddr(data, number); + record->dstaddr = ft_records_get_dstaddr(data, number); + record->nexthop = ft_records_get_nexthop(data, number); + record->input = ft_records_get_input(data, number); + record->output = ft_records_get_output(data, number); + record->dFlows = ft_records_get_dFlows(data, number); + record->dPkts = ft_records_get_dPkts(data, number); + record->dOctets = ft_records_get_dOctets(data, number); + record->First = ft_records_get_First(data, number); + record->Last = ft_records_get_Last(data, number); + record->srcport = ft_records_get_srcport(data, number); + record->dstport = ft_records_get_dstport(data, number); + record->prot = ft_records_get_prot(data, number); + record->tos = ft_records_get_tos(data, number); + record->tcp_flags = ft_records_get_tcp_flags(data, number); + record->engine_type = ft_records_get_engine_type(data, number); + record->engine_id = ft_records_get_engine_id(data, number); + record->src_mask = ft_records_get_src_mask(data, number); + record->dst_mask = ft_records_get_dst_mask(data, number); + record->src_as = ft_records_get_src_as(data, number); + record->dst_as = ft_records_get_dst_as(data, number); + record->in_encaps = ft_records_get_in_encaps(data, number); + record->out_encaps = ft_records_get_out_encaps(data, number); + record->peer_nexthop = ft_records_get_peer_nexthop(data, number); + record->router_sc = ft_records_get_router_sc(data, number); + record->src_tag = ft_records_get_src_tag(data, number); + record->dst_tag = ft_records_get_dst_tag(data, number); + record->extra_pkts = ft_records_get_extra_pkts(data, number); + record->marked_tos = ft_records_get_marked_tos(data, number); +} + +u_int32 *ft_records_get_unix_secs(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.unix_secs); +} + +u_int32 *ft_records_get_unix_nsecs(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.unix_nsecs); +} + +u_int32 *ft_records_get_sysUpTime(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.sysUpTime); +} + +u_int32 *ft_records_get_exaddr(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.exaddr); +} + +u_int32 *ft_records_get_srcaddr(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.srcaddr); +} + +u_int32 *ft_records_get_dstaddr(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.dstaddr); +} + +u_int32 *ft_records_get_nexthop(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.nexthop); +} + +u_int16 *ft_records_get_input(struct ft_data* data, int number) +{ + return (u_int16 *)(data->records[number] + data->offsets.input); +} + +u_int16 *ft_records_get_output(struct ft_data* data, int number) +{ + return (u_int16 *)(data->records[number] + data->offsets.output); +} + +u_int32 *ft_records_get_dFlows(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.dFlows); +} + +u_int32 *ft_records_get_dPkts(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.dPkts); +} + +u_int32 *ft_records_get_dOctets(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.dOctets); +} + +u_int32 *ft_records_get_First(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.First); +} + +u_int32 *ft_records_get_Last(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.Last); +} + +u_int16 *ft_records_get_srcport(struct ft_data* data, int number) +{ + return (u_int16 *)(data->records[number] + data->offsets.srcport); +} + +u_int16 *ft_records_get_dstport(struct ft_data* data, int number) +{ + return (u_int16 *)(data->records[number] + data->offsets.dstport); +} + +u_int8 *ft_records_get_prot(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.prot); +} + +u_int8 *ft_records_get_tos(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.tos); +} + +u_int8 *ft_records_get_tcp_flags(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.tcp_flags); +} + +u_int8 *ft_records_get_engine_type(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.engine_type); +} + +u_int8 *ft_records_get_engine_id(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.engine_id); +} + +u_int8 *ft_records_get_src_mask(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.src_mask); +} + +u_int8 *ft_records_get_dst_mask(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.dst_mask); +} + +u_int16 *ft_records_get_src_as(struct ft_data* data, int number) +{ + return (u_int16 *)(data->records[number] + data->offsets.src_as); +} + +u_int16 *ft_records_get_dst_as(struct ft_data* data, int number) +{ + return (u_int16 *)(data->records[number] + data->offsets.dst_as); +} + +u_int8 *ft_records_get_in_encaps(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.in_encaps); +} + +u_int8 *ft_records_get_out_encaps(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.out_encaps); +} + +u_int32 *ft_records_get_peer_nexthop(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.peer_nexthop); +} + +u_int32 *ft_records_get_router_sc(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.router_sc); +} + +u_int32 *ft_records_get_src_tag(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.src_tag); +} + +u_int32 *ft_records_get_dst_tag(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.dst_tag); +} + +u_int32 *ft_records_get_extra_pkts(struct ft_data* data, int number) +{ + return (u_int32 *)(data->records[number] + data->offsets.extra_pkts); +} + +u_int8 *ft_records_get_marked_tos(struct ft_data* data, int number) +{ + return (u_int8 *)(data->records[number] + data->offsets.marked_tos); +} + + + +void ft_close(struct ft_data* data) +{ + int i; + + ftio_close(&data->io); + + for (i=0; inumrecords; i++) { + free(data->records[i]); + } + free(data->records); + + if(data->fd) { + close(data->fd); + } + free(data); +} diff --git a/merger_validator.py b/merger_validator.py index a6a5000..053641a 100644 --- a/merger_validator.py +++ b/merger_validator.py @@ -1,6 +1,5 @@ from validator_common import * from copy import deepcopy -from tables import UIntCol from merger import MergerStorage from merger import Merger as MergerImpl from merger import MergerBranch as MergerBranchImpl @@ -9,8 +8,6 @@ from merger import MergerRejectBranch as MergerRejectBranchImpl from merger import MergerRule as MergerRuleImpl import itertools import allen_ops -import pytables -import record import options class MergerValidator(object): @@ -19,8 +16,8 @@ class MergerValidator(object): self.gr_filter_validator = gr_filter_validator self.mergers = deepcopy(parser.mergers) # The last field returns a list of the present fields for each branch - # ('rec_id', 'etime', 'stime', 'records', 'srcip', 'dstip', 'bytes', 'n', 'flags', 'srcports') - # ('rec_id', 'etime', 'stime', 'records', 'srcip', 'dstip', 'bytes', 'n', 'flags', 'dstports') + # ('rec_id', 'Last', 'First', 'records', 'srcip', 'dstip', 'bytes', 'n', 'flags', 'srcports') + # ('rec_id', 'Last', 'First', 'records', 'srcip', 'dstip', 'bytes', 'n', 'flags', 'dstports') self.branches_fields = gr_filter_validator.branches_fields # A simple dictionary mapptin of branch name to a GroupFilter # {'A': , 'B': } diff --git a/parser.py b/parser.py index 823e358..e6b0471 100644 --- a/parser.py +++ b/parser.py @@ -508,10 +508,10 @@ class Parser(object): # insert aggregation of record ids (needed for ungrouping later) p[0].aggr.insert(0,(Rule('union', p.lineno(2), [Field('rec_id'), 'records']))) - p[0].aggr.insert(0,(Rule('min', p.lineno(2), [Field('stime'), - 'stime']))) - p[0].aggr.insert(0,(Rule('max', p.lineno(2), [Field('etime'), - 'etime']))) + p[0].aggr.insert(0,(Rule('min', p.lineno(2), [Field('First'), + 'First']))) + p[0].aggr.insert(0,(Rule('max', p.lineno(2), [Field('Last'), + 'Last']))) self.groupers.append(p[0]) def p_module1_n(self, p): diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..bf15933 --- /dev/null +++ b/setup.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from distutils.core import setup +from distutils.extension import Extension +from Cython.Distutils import build_ext + +sourcefiles = ['ftreader.pyx', 'lib/ftreader.c'] + +ext_modules = [Extension('ftreader', sourcefiles, + libraries=['ft'], + include_dirs=['include'] + #include_dirs=['/usr/include/gwenhywfar4', '/usr/include/aqbanking5', ], + #extra_compile_args=['-Wno-cast-qual', '-Wno-strict-prototypes', ], + )] + +setup( + name = 'flowy', + version='1.32', + description='flowy network traffic analyzer', + long_description=''' +put a longer description here +''', + cmdclass = {'build_ext': build_ext}, + ext_modules = ext_modules, + download_url='http://pyneo.org/downloads/', + author='Johannes Schauer', + author_email='j.schauer@jacobs-university.de', + url='http://pyneo.org/', + data_files=[ + ('share/pyneod', ('pybankd.py', )), + ], +) + + diff --git a/timeindex.py b/timeindex.py index 2b1614a..7bb7e49 100644 --- a/timeindex.py +++ b/timeindex.py @@ -30,23 +30,23 @@ class TimeIndex(object): def len(self): return len(self.index) - def get_interval(self, stime, etime): - start = int(floor(stime/self.interval)) - end = int(floor(etime/self.interval) + 1) + def get_interval(self, First, Last): + start = int(floor(First/self.interval)) + end = int(floor(Last/self.interval) + 1) return xrange(start, end) def update_min_max_time(self, record): - if self.mintime > record.stime: - self.mintime = record.stime - if self.maxtime < record.etime: - self.maxtime = record.etime + if self.mintime > record.First: + self.mintime = record.First + if self.maxtime < record.Last: + self.maxtime = record.Last def get_total_interval(self): return self.get_interval(self.mintime, self.maxtime) def add(self, record): - interval = self.get_interval(record.stime, record.etime) + interval = self.get_interval(record.First, record.Last) for i in interval: self.index.setdefault(i, set()).add(record.rec_id) @@ -54,9 +54,9 @@ class TimeIndex(object): if self.len > self.maxsize: print "Warning large index" - def get_interval_records(self, stime, etime): + def get_interval_records(self, First, Last): res = set() - for i in self.get_interval(stime, etime): + for i in self.get_interval(First, Last): res |= self.index.setdefault(i, set()) # set union return sorted(res) \ No newline at end of file diff --git a/ungrouper_validator.py b/ungrouper_validator.py index 1aaf33e..ccb22b6 100644 --- a/ungrouper_validator.py +++ b/ungrouper_validator.py @@ -1,9 +1,6 @@ from copy import deepcopy -from pytables import create_table_file from ungrouper import Ungrouper as UngrouperImpl -from record import RecordReader from validator_common import * -from pytables import FlowRecordsTable, create_table_file import options class UngrouperValidator(object): @@ -98,4 +95,4 @@ class UngrouperValidator(object): br_to_groups, records, output, br_to_gr_output)) - return ungr_impl \ No newline at end of file + return ungr_impl diff --git a/validator_common.py b/validator_common.py index 16bc5d3..07531a0 100644 --- a/validator_common.py +++ b/validator_common.py @@ -1,5 +1,5 @@ from statement import Rule, GrouperRule, AllenRule, Field, Arg -from pytables import FlowRecordsTable +from ftreader import FtReader import os def flatten(l): @@ -49,19 +49,20 @@ def find_op(rule, module='operators'): rule.line)) def get_input_reader(parser): +# TODO: create reader somewhere else """Returns a reader for the parser's input""" - return FlowRecordsTable(parser.input.name) # parser.input.name is the ./netflow-trace.h5 file + if not getattr(parser, "reader", None): + parser.reader = FtReader(parser.input.name) + return parser.reader -def get_input_fields_types(input_reader): - return dict((f, t) for f, t in zip(input_reader.fields, - input_reader.types)) - -def check_rule_fields(rule, fields): +def check_rule_fields(rule, reader): for arg in rule.args: if type(arg) is Field: - if arg.name in fields: + if reader.supports_attr(arg.name): continue else: + if arg.name == "rec_id": + continue msg = 'There is no such field %s, '%arg.name msg += 'referenced at line %s'%rule.line raise SyntaxError(msg)