1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
'''
A utility class to manage blacklisted
addresses & interact with the database.
Verified: 2021 February 7
* Follows PEP8
* Tested Platforms
* Windows 10
'''
from server.database import Database
from server.error import ServerError
from server.console import Console
from shared.state import Static
import ipaddress
class Blacklist:
__TABLE_NAME = 'blacklist'
__TABLE_KWARGS = {
'address': 'text'
}
def __init__(self, ret):
self.__ret = ret
self.__db = Database()
self.__db.create_table(Blacklist.__TABLE_NAME,
**Blacklist.__TABLE_KWARGS)
self.__db.commit()
self.__blacklist = [address[0] for address in
self.__db.read(Blacklist.__TABLE_NAME)]
@ServerError.quiet
def __del__(self):
self.__db.close()
@property
def blacklist(self):
return self.__blacklist
def add(self, address):
address = self.__verify_address(address)
if self.__address_exists(address):
return Console.printf('Address is already blacklisted',
Static.WARNING, ret=self.__ret)
else:
self.__db.write(Blacklist.__TABLE_NAME, (address,))
self.__db.commit()
return Console.printf('Blacklist address added',
Static.SUCCESS, ret=self.__ret)
def remove(self, address):
address = self.__verify_address(address)
if self.__address_exists(address):
self.__db.delete(Blacklist.__TABLE_NAME,
('address', '=', address))
self.__db.commit()
return Console.printf('Blacklist address removed',
Static.SUCCESS, ret=self.__ret)
else:
return Console.printf('Address is not blacklisted',
Static.WARNING, ret=self.__ret)
def update(self, address, new_address):
address = self.__verify_address(address)
new_address = self.__verify_address(new_address)
if self.__address_exists(address):
if self.__address_exists(new_address):
return Console.printf('New address already exists',
Static.WARNING, ret=self.__ret)
else:
self.__db.update(Blacklist.__TABLE_NAME,
('address', '=', address),
address=new_address)
self.__db.commit()
return Console.printf('Blacklist address updated',
Static.SUCCESS, ret=self.__ret)
else:
return Console.printf('Address is not blacklisted',
Static.WARNING, ret=self.__ret)
def list(self):
if self.blacklist:
headers, values = ('Address', 'IPV6', 'Link Local',
'Private', 'Loopback'), []
for address in self.blacklist:
address_obj = ipaddress.ip_address(address)
values.append((str(address_obj),
str(address_obj.version == 6),
str(address_obj.is_link_local),
str(address_obj.is_private),
str(address_obj.is_loopback)))
return Console.tabulate(values, headers, self.__ret)
else:
return Console.printf('No addresses blacklisted',
Static.INFO, ret=self.__ret)
def __address_exists(self, address):
return address in self.blacklist
def __verify_address(self, address):
return str(ipaddress.ip_address(address))
|