#include #include #include #include #include #include #include "ftreader.h" #include "flowy.h" #include "auto_comps.h" // TODO: allow OR in filters // TODO: allow grouping and merging with more than one module /* * for bitwise operations the delta is the value with which the operation is * done as in: bitAND(flags, delta) = value */ /* * specifying two record numbers and what fields to compare * * for allen operations, the offsets are the offsets of First and Last * respectively and field_lengths are FIRST and LAST */ char **filter(struct ft_data *data, struct filter_rule *filter_rules, int num_filter_rules, size_t *num_filtered_records) { int i, j; char **filtered_records; *num_filtered_records = 0; filtered_records = (char **)malloc(sizeof(char *)**num_filtered_records); if (filtered_records == NULL) { perror("malloc"); exit(EXIT_FAILURE); } for (i = 0; i < data->num_records; i++) { for (j = 0; j < num_filter_rules; j++) { if (!filter_rules[j].func(data->records[i], filter_rules[j].field_offset, filter_rules[j].value, filter_rules[j].delta)) break; } // break if a rule did not return true if (j < num_filter_rules) continue; (*num_filtered_records)++; filtered_records = (char **)realloc(filtered_records, sizeof(char *)**num_filtered_records); if (filtered_records == NULL) { perror("malloc"); exit(EXIT_FAILURE); } filtered_records[*num_filtered_records-1] = data->records[i]; } filtered_records = (char **)realloc(filtered_records, sizeof(char *)*(*num_filtered_records+1)); if (filtered_records == NULL) { perror("malloc"); exit(EXIT_FAILURE); } filtered_records[*num_filtered_records] = NULL; return filtered_records; } struct group **grouper(char **filtered_records, size_t num_filtered_records, struct grouper_rule *group_modules, int num_group_modules, struct grouper_aggr *aggr, size_t num_group_aggr, size_t *num_groups) { struct group **groups; int i, j, k; int num_group_members; *num_groups = 1; groups = (struct group **)malloc(sizeof(struct group *)); for (i = 0; i < num_filtered_records; i++) { if (i%10000==0) { printf("%d ", i); /* printf("\r"); for (j=0; j<(i*w.ws_col/100); j++) printf("="); */ fflush(stdout); } if (filtered_records[i] == NULL) continue; groups = (struct group **)realloc(groups, sizeof(struct group*)**num_groups); groups[*num_groups-1] = (struct group *)malloc(sizeof(struct group)); if (groups[*num_groups-1] == NULL) { perror("malloc"); exit(EXIT_FAILURE); } num_group_members = 1; groups[*num_groups-1]->members = (char **)malloc(sizeof(char *)); groups[*num_groups-1]->members[0] = filtered_records[i]; for (j = i+1; j < num_filtered_records; j++) { if (i == j) // dont try to group with itself continue; if (filtered_records[j] == NULL) continue; // check all module filter rules for those two records for (k = 0; k < num_group_modules; k++) { if (!group_modules[k].func(groups[*num_groups-1], group_modules[k].field_offset1, filtered_records[j], group_modules[k].field_offset2, group_modules[k].delta)) break; } if (k < num_group_modules) continue; num_group_members++; groups[*num_groups-1]->members = (char **)realloc(groups[*num_groups-1]->members, sizeof(char *)*num_group_members); groups[*num_groups-1]->members[num_group_members-1] = filtered_records[j]; filtered_records[j] = NULL; } groups[*num_groups-1]->aggr = (struct aggr *)malloc(sizeof(struct aggr)*num_group_aggr); if (groups[*num_groups-1]->aggr == NULL) { perror("malloc"); exit(EXIT_FAILURE); } for (j = 0; j < num_group_aggr; j++) { groups[*num_groups-1]->aggr[j] = aggr[j].func(groups[*num_groups-1]->members, num_group_members, aggr[j].field_offset); } (*num_groups)++; } // add terminating group groups = (struct group **)realloc(groups, sizeof(struct group *)**num_groups); if (groups == NULL) { perror("malloc"); exit(EXIT_FAILURE); } groups[*num_groups-1] = (struct group *)malloc(sizeof(struct group)); if (groups[*num_groups-1] == NULL) { perror("malloc"); exit(EXIT_FAILURE); } groups[*num_groups-1]->num_members = 0; groups[*num_groups-1]->members = NULL; groups[*num_groups-1]->aggr = NULL; return groups; } struct group **group_filter(struct group **groups, size_t num_groups, struct gfilter_rule *rules, size_t num_gfilter_rules, size_t num_aggr, size_t *num_filtered_groups) { int i, j; struct group **filtered_groups; *num_filtered_groups = 0; filtered_groups = (struct group **)malloc(sizeof(struct group *)**num_filtered_groups); for (i = 0; i < num_aggr; i++) { for (j = 0; j < num_gfilter_rules; j++) { if (!rules[j].func(groups[i], rules[j].field, rules[j].value, rules[j].delta)) break; } if (j < num_gfilter_rules) { free(groups[i]->members); free(groups[i]->aggr); free(groups[i]); groups[i] = NULL; continue; } (*num_filtered_groups)++; filtered_groups = (struct group **)realloc(filtered_groups, sizeof(struct group *)**num_filtered_groups); filtered_groups[*num_filtered_groups-1] = groups[i]; } filtered_groups = (struct group **)realloc(filtered_groups, sizeof(struct group *)**num_filtered_groups+1); if (filtered_groups == NULL) { perror("malloc"); exit(EXIT_FAILURE); } filtered_groups[*num_filtered_groups] = groups[i]; return filtered_groups; } /* struct group **merger(struct group ***group_collections, int num_threads, struct merger_rule *filter) { struct group **group_tuples; int buffer_size; int num_group_tuples; int i, j; buffer_size = 128; group_tuples = (struct group **)malloc(sizeof(struct group *)*num_threads*buffer_size); if (group_tuples == NULL) { perror("malloc"); exit(EXIT_FAILURE); } num_group_tuples = 0; for (i = 0; group_collections[0][i]->aggr != NULL; i++) { for (j = 0; group_collections[1][j]->aggr != NULL; j++) { if (!filter[0].func(group_collections[0][i], filter[0].field1, group_collections[1][j], filter[0].field2, filter[0].delta) || !filter[1].filter(group_collections[0][i], filter[1].field1, group_collections[1][j], filter[1].field2, filter[1].delta) ) continue; if (num_group_tuples == buffer_size) { buffer_size *= 2; group_tuples = (struct group **)realloc(group_tuples, sizeof(struct group *)*num_threads*buffer_size); if (group_tuples == NULL) { perror("malloc"); exit(EXIT_FAILURE); } } group_tuples[num_group_tuples*num_threads + 0] = group_collections[0][i]; group_tuples[num_group_tuples*num_threads + 1] = group_collections[1][j]; num_group_tuples++; } } group_tuples = (struct group **)realloc(group_tuples, sizeof(struct group *)*num_threads*(buffer_size + 1)); if (group_tuples == NULL) { perror("malloc"); exit(EXIT_FAILURE); } group_tuples[num_group_tuples*num_threads + 0] = NULL; group_tuples[num_group_tuples*num_threads + 1] = NULL; // printf("number of group tuples: %d\n", num_group_tuples); return group_tuples; }*/ static void *branch_start(void *arg) { struct branch_info *binfo = (struct branch_info *)arg; struct group **groups; struct group **filtered_groups; char **filtered_records; size_t num_filtered_records; size_t num_groups; size_t num_filtered_groups; /* * FILTER */ filtered_records = filter(binfo->data, binfo->filter_rules, binfo->num_filter_rules, &num_filtered_records); printf("number of filtered records: %zd\n", num_filtered_records); /* * GROUPER */ groups = grouper(filtered_records, num_filtered_records, binfo->group_modules, binfo->num_group_modules, binfo->aggr, binfo->num_aggr, &num_groups); free(filtered_records); printf("number of groups: %zd\n", num_groups); /* * GROUPFILTER */ filtered_groups = group_filter(groups, num_groups, binfo->gfilter_rules, binfo->num_gfilter_rules, binfo->num_aggr, &num_filtered_groups); free(groups); printf("number of filtered groups: %zd\n", num_filtered_groups); pthread_exit(filtered_groups); } int main(int argc, char **argv) { struct ft_data *data; int num_threads; int i, ret; pthread_t *thread_ids; pthread_attr_t *thread_attrs; struct branch_info *binfos; struct group ***group_collections; // struct group **group_tuples; num_threads = 2; data = ft_open(STDIN_FILENO); binfos = (struct branch_info *)calloc(num_threads, sizeof(struct branch_info)); if (binfos == NULL) { perror("malloc"); exit(EXIT_FAILURE); } /* * custom rules */ struct filter_rule filter_rules_branch1[1] = { { data->offsets.dstport, 80, 0, filter_eq_uint16_t }, }; struct grouper_rule group_module_branch1[2] = { { data->offsets.srcaddr, data->offsets.srcaddr, 0, grouper_eq_uint32_t }, { data->offsets.dstaddr, data->offsets.dstaddr, 0, grouper_eq_uint32_t }, // { data->offsets.Last, data->offsets.First, 1, grouper_lt_uint32_t_rel } }; struct grouper_aggr group_aggr_branch1[4] = { { 0, data->offsets.srcaddr, aggr_static_uint32_t }, { 0, data->offsets.dstaddr, aggr_static_uint32_t }, { 0, data->offsets.dOctets, aggr_sum_uint32_t }, { 0, data->offsets.tcp_flags, aggr_or_uint16_t } }; struct gfilter_rule gfilter_branch1[0] = { }; binfos[0].data = data; binfos[0].filter_rules = filter_rules_branch1; binfos[0].num_filter_rules = 1; binfos[0].group_modules = group_module_branch1; binfos[0].num_group_modules = 2; binfos[0].aggr = group_aggr_branch1; binfos[0].num_aggr = 4; binfos[0].gfilter_rules = gfilter_branch1; binfos[0].num_gfilter_rules = 0; struct filter_rule filter_rules_branch2[1] = { { data->offsets.srcport, 80, 0, filter_eq_uint16_t }, }; struct grouper_rule group_module_branch2[2] = { { data->offsets.srcaddr, data->offsets.srcaddr, 0, grouper_eq_uint32_t }, { data->offsets.dstaddr, data->offsets.dstaddr, 0, grouper_eq_uint32_t }, // { data->offsets.Last, data->offsets.First, 1, grouper_lt_uint32_t_rel }, }; struct grouper_aggr group_aggr_branch2[4] = { { 0, data->offsets.srcaddr, aggr_static_uint32_t }, { 0, data->offsets.dstaddr, aggr_static_uint32_t }, { 0, data->offsets.dOctets, aggr_sum_uint32_t }, { 0, data->offsets.tcp_flags, aggr_or_uint16_t } }; struct gfilter_rule gfilter_branch2[0] = { }; binfos[1].data = data; binfos[1].filter_rules = filter_rules_branch2; binfos[1].num_filter_rules = 1; binfos[1].group_modules = group_module_branch2; binfos[1].num_group_modules = 2; binfos[1].aggr = group_aggr_branch2; binfos[1].num_aggr = 4; binfos[1].gfilter_rules = gfilter_branch2; binfos[0].num_gfilter_rules = 0; /* * SPLITTER * * (mostly pthread stuff) */ thread_ids = (pthread_t *)calloc(num_threads, sizeof(pthread_t)); if (thread_ids == NULL) { perror("malloc"); exit(EXIT_FAILURE); } thread_attrs = (pthread_attr_t *)calloc(num_threads, sizeof(pthread_attr_t)); if (thread_attrs == NULL) { perror("malloc"); exit(EXIT_FAILURE); } group_collections = (struct group ***)malloc(num_threads*sizeof(struct group **)); if (group_collections == NULL) { perror("malloc"); exit(EXIT_FAILURE); } for (i = 0; i < num_threads; i++) { ret = pthread_attr_init(&thread_attrs[i]); if (ret != 0) { errno = ret; perror("pthread_attr_init"); exit(EXIT_FAILURE); } ret = pthread_create(&thread_ids[i], &thread_attrs[i], &branch_start, (void *)(&binfos[i])); if (ret != 0) { errno = ret; perror("pthread_create"); exit(EXIT_FAILURE); } ret = pthread_attr_destroy(&thread_attrs[i]); if (ret != 0) { errno = ret; perror("pthread_attr_destroy"); exit(EXIT_FAILURE); } } for (i = 0; i < num_threads; i++) { ret = pthread_join(thread_ids[i], (void **)(&group_collections[i])); if (ret != 0) { errno = ret; perror("pthread_join"); exit(EXIT_FAILURE); } } exit(EXIT_SUCCESS); free(thread_ids); free(thread_attrs); free(binfos); /* * MERGER */ /* struct merger_filter_rule mfilter[3] = { { 0, 0, 1, 1, 0, mfilter_equal }, { 0, 2, 1, 2, 0, mfilter_lessthan }, };*/ // group_tuples = merger(group_collections, num_threads, mfilter); /* * UNGROUPER */ // TODO: free group_collections at some point exit(EXIT_SUCCESS); }