1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# (c) 2017, Alice Ferrazzi <alice.ferrazzi@gmail.com>
# Distributed under the terms of the GNU General Public License v2 or later
import gzip
import uuid
import tempfile
import os
import os.path
import re
from git import Repo
from elivepatch_client.client import restful
def id_generate_uuid():
generated_uuid = str(uuid.uuid4())
return generated_uuid
class Kernel(object):
"""
Manage kernels files
"""
def __init__(self, restserver_url, session_uuid=None):
self.config_fullpath = ''
self.patch_fullpath = ''
self.restserver_url = restserver_url
self.kernel_version = None
if session_uuid:
self.session_uuid = session_uuid
else:
self.session_uuid = id_generate_uuid()
print('This session uuid: ' + str(self.session_uuid))
self.rest_manager = restful.ManaGer(self.restserver_url, self.kernel_version, self.session_uuid)
def set_config(self, config_fullpath):
self.config_fullpath = config_fullpath
def set_patch(self, patch_fullpath):
self.patch_fullpath = patch_fullpath
def send_files(self):
"""
Send config and patch files
:return: void
"""
f_action = FileAction(self.config_fullpath)
temporary_config = tempfile.NamedTemporaryFile(delete=False)
# check the configuration file
# TODO: make it more compact
if re.findall("[.]gz\Z", self.config_fullpath):
print('gz extension')
# uncompress the gzip config file
# return configuration temporary folder
temporary_config = f_action.ungz(temporary_config)
else:
# read already uncompressed configuration
with open(self.config_fullpath, 'rb') as in_file:
config = in_file.read()
# Store uncompressed temporary file
temporary_config.write(config)
# Get kernel version from the configuration file header
self.kernel_version = f_action.config_kernel_version(temporary_config)
self.rest_manager.set_kernel_version(self.kernel_version)
print('debug: kernel version = ' + self.rest_manager.get_kernel_version())
send_api = '/elivepatch/api/v1.0/get_files'
# send uncompressed config and patch files fullpath
self.rest_manager.send_file(temporary_config, self.patch_fullpath, send_api)
def build_livepatch(self):
self.rest_manager.build_livepatch()
def get_livepatch(self):
self.rest_manager.get_livepatch()
class CVE(object):
"""
Check the kernel against a CVE repository
"""
def __init__(self):
self.git_url = "https://github.com/nluedtke/linux_kernel_cves"
self.repo_dir = "/tmp/kernel_cve/"
pass
def download(self):
Repo.clone_from(self.git_url, self.repo_dir)
def set_repo(self, git_url, repo_dir):
self.git_url = git_url
self.repo_dir = repo_dir
class FileAction(object):
"""
Work with files
"""
def __init__(self, full_path):
self.full_path = full_path
pass
def ungz(self, temporary):
"""
Uncompress gzipped configuration
:return: Uncompressed configuration file path
"""
path_gz_file = self.full_path
print('path_gz_file: '+ path_gz_file + ' temporary_path_uncompressed_file: ' +
temporary.name)
if not os.path.isdir(path_gz_file):
with gzip.open(path_gz_file, 'rb') as in_file:
uncompressed_output = in_file.read()
# Store uncompressed file
temporary.write(uncompressed_output)
return temporary
def config_kernel_version(self, uncompressed_config_file):
"""
Find the kernel version from where the configuration as been generated
:param uncompressed_config_file:
:return: kernel version
"""
uncompressed_config_file.seek(0)
with uncompressed_config_file as f:
i = 0
while i < 2:
f.readline()
if i == 1:
kernel_line = str(f.readline())
i += 1
kernel_version_raw = str(kernel_line.split(' ')[2])
kernel_version = kernel_version_raw.split(('-'))[0]
return kernel_version
|