Last active
November 12, 2021 08:41
-
-
Save otakbeku/95f5e5ff684a6db3b1e0cdc32ae33485 to your computer and use it in GitHub Desktop.
A modified FP-Growth
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import re | |
| import pyfpgrowth as fp | |
| import numpy as np | |
| class treeNode: | |
| def __init__(self, name_value, num_of_occur, parent_node): | |
| self.name = name_value | |
| self.count = num_of_occur | |
| self.next_node = None | |
| self.parent_node = parent_node | |
| self.children = {} | |
| def increment(self, num_of_occur): | |
| self.count += 1 | |
| def display(self, index=1): | |
| print(" " * index, self.name, " ", self.count) | |
| for child in self.children.values(): | |
| child.display(index + 1) | |
| def create_tree(dataset, mininum_support=1): | |
| header_table = {} | |
| for transaction in dataset: | |
| for item in transaction: | |
| header_table[item] = header_table.get( | |
| item, 0) + dataset[transaction] | |
| for k in list(header_table): | |
| if header_table[k] < mininum_support: | |
| del (header_table[k]) | |
| frequent_item_set = set(header_table.keys()) | |
| print("frequent_item_set", frequent_item_set) | |
| if len(frequent_item_set) == 0: | |
| return None, None | |
| for k in header_table: | |
| header_table[k] = [header_table[k], None] | |
| FP_tree = treeNode('Null set', 1, None) | |
| for transaction_set, count in dataset.items(): | |
| localD = {} | |
| for item in transaction_set: | |
| if item in frequent_item_set: | |
| localD[item] = header_table[item][0] | |
| if len(localD) > 0: | |
| ordered_items = [v[0] for v in sorted( | |
| localD.items(), key=lambda p: p[1], reverse=True)] | |
| update_tree(ordered_items, FP_tree, header_table, count) | |
| return FP_tree, header_table | |
| def update_tree(items, in_tree, header_table, count): | |
| if items[0] in in_tree.children: | |
| in_tree.children[items[0]].increment(count) | |
| else: | |
| in_tree.children[items[0]] = treeNode(items[0], count, in_tree) | |
| if header_table[items[0]][1] == None: | |
| header_table[items[0]][1] = in_tree.children[items[0]] | |
| else: | |
| update_header(header_table[items[0]][1], | |
| in_tree.children[items[0]]) | |
| if len(items) > 1: | |
| update_tree(items[1::], in_tree.children[items[0]], | |
| header_table, count) | |
| def update_header(node_to_test: treeNode, target_node): | |
| while (node_to_test.next_node != None): | |
| node_to_test = node_to_test.next_node | |
| node_to_test.next_node = target_node | |
| def load_data(filename: str = None): | |
| fname = "DATA-TRANSAKSI-JAN-FEB-bp.csv" | |
| with open(fname) as f: | |
| content = f.readlines() | |
| print(type(content)) | |
| content_m = [None] * len(content) | |
| output = set() | |
| for i, c in enumerate(content): | |
| # print(c, type(c)) | |
| content_m[i] = re.findall("[a-zA-Z]+", c) | |
| output.add(re.findall("[a-zA-Z]+", c)) | |
| # print(content_m[i]) | |
| # return content_m | |
| return list(output) | |
| def create_init_set(dataset): | |
| ret_dict = {} | |
| for transaction in dataset: | |
| ret_dict[frozenset(transaction)] = 1 | |
| return ret_dict | |
| # MINING | |
| def ascend_tree(leaf_node: treeNode, prefix_path: list): | |
| if leaf_node.parent_node != None: | |
| prefix_path.append(leaf_node.name) | |
| ascend_tree(leaf_node.parent_node, prefix_path) | |
| def find_prefix_path(base_pat, tree_node: treeNode): | |
| condition_path = {} | |
| while tree_node != None: | |
| prefix_path = [] | |
| ascend_tree(tree_node, prefix_path) | |
| if len(prefix_path) > 1: | |
| condition_path[frozenset(prefix_path[1:])] = tree_node.count | |
| tree_node = tree_node.next_node | |
| return condition_path | |
| # CONVENTIONAL | |
| class fpgrowthlib: | |
| def __init__(self, fname="DATA-TRANSAKSI-JAN-FEB.csv"): | |
| self.__fname = fname | |
| # self.loaddata() | |
| def __unique(self, list1): | |
| unique_list = [] | |
| # traverse for all elements | |
| for x in list1: | |
| unique_list.append(np.unique(x)) | |
| # print(unique_list) | |
| # [print("\t", x) for x in unique_list] | |
| return unique_list | |
| # def loaddata(self): | |
| # with open(self.__fname) as f: | |
| # content = f.readlines() | |
| # # print("content", content) | |
| # | |
| # # content_m = re.search(',') | |
| # # content_m.group(0) | |
| # # print(type(content)) | |
| # del content[0] | |
| # content_m = [None] * len(content) | |
| # | |
| # for i, c in enumerate(content): | |
| # # content_m[i] = re.split('', c[0]) | |
| # temp = re.findall("[a-zA-Z]+", c) | |
| # content_m[i] = temp | |
| # # [print("\t",x) for x in temp] | |
| # # [output.add(x) for x in temp] | |
| # # output.add()) | |
| # # content_m[i] = re.search("(?<=,)",c) | |
| # # content_m[i].group(0) | |
| # | |
| # # [print(con) for con in content_m] | |
| # res = self.__unique(content_m) | |
| # | |
| # return res | |
| def loaddata(self): | |
| with open(self.__fname) as f: | |
| content = f.readlines() | |
| # print("content", content) | |
| # content_m = re.search(',') | |
| # content_m.group(0) | |
| # print(type(content)) | |
| del content[0] | |
| content_m = [None] * len(content) | |
| cek = "1" | |
| data_temp = [] | |
| for c in content: | |
| # content_m[i] = re.split('', c[0]) | |
| # temp = re.findall("[a-zA-Z]+", c) | |
| temp = c.split(",") | |
| if temp[1] == cek: | |
| # content_m[int(cek)].append(temp[6]) | |
| data_temp.append(temp[6].upper()) | |
| else: | |
| cek = temp[1] | |
| content_m[int(cek)] = data_temp | |
| data_temp = [] | |
| data_temp.append(temp[6].upper()) | |
| # content_m[int(cek)].append(temp[6]) | |
| content_m[int(cek)] = data_temp | |
| # content_m[i] = temp | |
| # print("Baru") | |
| # [print("\t",x) for x in content_m[int(cek)]] | |
| # [output.add(x) for x in temp] | |
| # output.add()) | |
| # content_m[i] = re.search("(?<=,)",c) | |
| # content_m[i].group(0) | |
| # [print(con) for con in content_m] | |
| res = self.__unique(content_m) | |
| return res | |
| def get_frekuensi_per_item(self): | |
| with open(self.__fname) as f: | |
| content = f.readlines() | |
| del content[0] | |
| content_m = {} | |
| jumlah_data = 1 | |
| cek = "1" | |
| for c in content: | |
| temp = c.split(",") | |
| if temp[6] in content_m: | |
| content_m[temp[6].upper()] += 1 | |
| else: | |
| content_m[temp[6].upper()] = 1 | |
| if temp[1] != cek: | |
| cek = temp[1] | |
| jumlah_data += 1 | |
| return content_m, jumlah_data | |
| def get_jumlah_data(self): | |
| with open(self.__fname) as f: | |
| content = f.readlines() | |
| del content[0] | |
| jumlah_data = 1 | |
| cek = "1" | |
| for c in content: | |
| temp = c.split(",") | |
| if temp[1] != cek: | |
| cek = temp[1] | |
| jumlah_data += 1 | |
| return jumlah_data | |
| def get_support_per_item(self, data: dict, jumlah_data): | |
| for key in data.keys(): | |
| data[key] = data[key] / jumlah_data | |
| return data | |
| def get_support_pattern(self, res, minsup=5, jumlah_data=10): | |
| temp = fp.find_frequent_patterns(res, minsup) | |
| baru = {} | |
| for key, value in temp.items(): | |
| baru[key] = value / jumlah_data | |
| return baru | |
| def generate_pattern(self, res, minsup=5): | |
| return fp.find_frequent_patterns(res, minsup) | |
| def generate_association_rules(self, pattern, minconf=0.01): | |
| rules = fp.generate_association_rules(patterns=pattern, | |
| confidence_threshold=minconf) | |
| result = [None] * len(rules) | |
| count = 0 | |
| # for index, key, itemset in enumerate(rules.items()): | |
| # print("index", index) | |
| # result[index] = {'Antecedent': str(key), 'Consequent': str(itemset[0]), 'Confidence': str(itemset[1])} | |
| for key, itemset in rules.items(): | |
| result[count] = {'Antecedent': key, | |
| 'Consequent': itemset[0], 'Confidence': itemset[1]} | |
| count += 1 | |
| # print("result", result[0]) | |
| return result | |
| def generate_lift_ratio(self): | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment