aboutsummaryrefslogtreecommitdiff
blob: b27a231f342539b4dcd826d1e00dc19f0b37f7ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# (c) 2017, Alice Ferrazzi <alice.ferrazzi@gmail.com>
# Distributed under the terms of the GNU General Public License v2 or later

import gzip
import uuid
import tempfile

import os
import os.path
import re
from git import Repo

from elivepatch_client.client import restful


def id_generate_uuid():
    generated_uuid = str(uuid.uuid4())
    return generated_uuid


class Kernel(object):
    """
    Manage kernels files
    """
    def __init__(self, restserver_url, kernel_version, session_uuid=None):
        self.config_fullpath = ''
        self.main_patch_fullpath = ''
        self.restserver_url = restserver_url
        self.kernel_version = kernel_version
        if session_uuid:
            self.session_uuid = session_uuid
        else:
            self.session_uuid = id_generate_uuid()
        print('This session uuid: ' + str(self.session_uuid))
        self.rest_manager = restful.ManaGer(self.restserver_url, self.kernel_version, self.session_uuid)

    def set_config(self, config_fullpath):
        self.config_fullpath = config_fullpath

    def set_main_patch(self, main_patch_fullpath):
        self.main_patch_fullpath = main_patch_fullpath

    def send_files(self):
        """
        Send config and patch files

        :return: void
        """
        f_action = FileAction(self.config_fullpath)
        temporary_config = tempfile.NamedTemporaryFile(delete=False)
        # check the configuration file
        # TODO: make it more compact
        if re.findall("[.]gz\Z", self.config_fullpath):
            print('gz extension')
            # uncompress the gzip config file
            # return configuration temporary folder
            temporary_config = f_action.decompress_gz(temporary_config)
        else:
            # read already uncompressed configuration
            with open(self.config_fullpath, 'rb') as in_file:
                config = in_file.read()
                # Store uncompressed temporary file
            temporary_config.write(config)
        # Get kernel version from the configuration file header
        #self.kernel_version = f_action.config_kernel_version(temporary_config)
        self.rest_manager.set_kernel_version(self.kernel_version)
        print('debug: kernel version = ' + self.rest_manager.get_kernel_version())

        send_api = '/elivepatch/api/v1.0/get_files'
        incremental_patches= None

        # send uncompressed config and patch files fullpath
        self.rest_manager.send_files(temporary_config, self.main_patch_fullpath, incremental_patches, send_api)

    def build_livepatch(self):
        self.rest_manager.build_livepatch()

    def get_livepatch(self):
        self.rest_manager.get_livepatch(self.main_patch_fullpath)


class CVE(object):
    """
    Check the kernel against a CVE repository
    """
    def __init__(self):
        self.git_url = "https://github.com/nluedtke/linux_kernel_cves"
        self.repo_dir = "/tmp/kernel_cve/"
        pass

    def download(self):
        Repo.clone_from(self.git_url, self.repo_dir)

    def set_repo(self, git_url, repo_dir):
        self.git_url = git_url
        self.repo_dir = repo_dir


class FileAction(object):
    """
    Work with files
    """
    def __init__(self, full_path):
        self.full_path = full_path
        pass

    def decompress_gz(self, temporary):
        """
        Uncompress gzipped configuration
        :return: Uncompressed configuration file path
        """
        path_gz_file = self.full_path
        print('path_gz_file: '+ path_gz_file + ' temporary_path_uncompressed_file: ' +
              temporary.name)
        if not os.path.isdir(path_gz_file):
            with gzip.open(path_gz_file, 'rb') as in_file:
                uncompressed_output = in_file.read()
            # Store uncompressed file
            temporary.write(uncompressed_output)
        return temporary

    def config_kernel_version(self, uncompressed_config_file):
        """
        Find the kernel version from where the configuration as been generated
        :param uncompressed_config_file:
        :return: kernel version
        """
        uncompressed_config_file.seek(0)
        with uncompressed_config_file as f:
            i = 0
            while i < 2:
                f.readline()
                if i == 1:
                    kernel_line = str(f.readline())
                i += 1
        kernel_version_raw = str(kernel_line.split(' ')[2])
        kernel_version = kernel_version_raw.split(('-'))[0]
        return kernel_version