summaryrefslogtreecommitdiff
path: root/server/blacklist.py
blob: 84b8fb216d0d24d22e81bbb0e4818d55d9ed0bb7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
'''
    A utility class to manage blacklisted
    addresses & interact with the database.

    Verified: 2021 February 7
    * Follows PEP8
    * Tested Platforms
        * Windows 10
'''

from server.database import Database
from server.error import ServerError
from server.console import Console
from shared.state import Static

import ipaddress


class Blacklist:

    __TABLE_NAME = 'blacklist'
    __TABLE_KWARGS = {
        'address': 'text'
    }

    def __init__(self, ret):
        self.__ret = ret

        self.__db = Database()
        self.__db.create_table(Blacklist.__TABLE_NAME,
                               **Blacklist.__TABLE_KWARGS)
        self.__db.commit()

        self.__blacklist = [address[0] for address in
                            self.__db.read(Blacklist.__TABLE_NAME)]

    @ServerError.quiet
    def __del__(self):
        self.__db.close()

    @property
    def blacklist(self):
        return self.__blacklist

    def add(self, address):
        address = self.__verify_address(address)

        if self.__address_exists(address):
            return Console.printf('Address is already blacklisted',
                                  Static.WARNING, ret=self.__ret)
        else:
            self.__db.write(Blacklist.__TABLE_NAME, (address,))
            self.__db.commit()

            return Console.printf('Blacklist address added',
                                  Static.SUCCESS, ret=self.__ret)

    def remove(self, address):
        address = self.__verify_address(address)

        if self.__address_exists(address):
            self.__db.delete(Blacklist.__TABLE_NAME,
                             ('address', '=', address))
            self.__db.commit()

            return Console.printf('Blacklist address removed',
                                  Static.SUCCESS, ret=self.__ret)
        else:
            return Console.printf('Address is not blacklisted',
                                  Static.WARNING, ret=self.__ret)

    def update(self, address, new_address):
        address = self.__verify_address(address)
        new_address = self.__verify_address(new_address)

        if self.__address_exists(address):
            if self.__address_exists(new_address):
                return Console.printf('New address already exists',
                                      Static.WARNING, ret=self.__ret)
            else:
                self.__db.update(Blacklist.__TABLE_NAME,
                                 ('address', '=', address),
                                 address=new_address)
                self.__db.commit()
                return Console.printf('Blacklist address updated',
                                      Static.SUCCESS, ret=self.__ret)
        else:
            return Console.printf('Address is not blacklisted',
                                  Static.WARNING, ret=self.__ret)

    def list(self):
        if self.blacklist:
            headers, values = ('Address', 'IPV6', 'Link Local',
                               'Private', 'Loopback'), []

            for address in self.blacklist:
                address_obj = ipaddress.ip_address(address)

                values.append((str(address_obj),
                               str(address_obj.version == 6),
                               str(address_obj.is_link_local),
                               str(address_obj.is_private),
                               str(address_obj.is_loopback)))

            return Console.tabulate(values, headers, self.__ret)
        else:
            return Console.printf('No addresses blacklisted',
                                  Static.INFO, ret=self.__ret)

    def __address_exists(self, address):
        return address in self.blacklist

    def __verify_address(self, address):
        return str(ipaddress.ip_address(address))