StegBase.py 4.63 KB
"""
This module implements a common base class of the steganographic
algorithms which embed data into JPEG files.
In order to run plugins inheriting from this class
you must build the rw_dct.so library which interfaces with libjpeg.
To do so, just run
% python setup.py build
in stegotool/util and copy the rw_dct.so
library (which can be found somewhere in the build-directory to
stegotool/util.

"""
import numpy as np
import time
import os
import sys
import random
import re

import mjsteg
import jpegObj
from common import *


class StegBase(object):
    """
    This is the base class for some of the JPEG-algorithms that behave
    similarly such as JSteg, OutGuess and F3.
    """

    def __init__(self):
        """
        Constructor of the JPEGSteg class.
        """
        self.t0 = None
        self.cov_jpeg = None
        self.cov_data = None
        self.hid_data = None

    def _get_cov_data(self, img_path):
        """
        Returns DCT coefficients of the cover image.
        """
        self.cov_jpeg = jpegObj.Jpeg(img_path)
        self.cov_data = self.cov_jpeg.getCoefBlocks()
        return self.cov_data


    def _get_hid_data(self, src_hidden):
        """
        Returnsthe secret data as byte sequence.
        """
        raw = [0, 0, 0, 0] + np.fromfile(src_hidden, np.uint8).tolist()
        raw_size = len(raw)
        for i in xrange(4):
            raw[i] = raw_size % 256
            raw_size /= 256
        self.hid_data = np.array(raw)

        if np.size(self.hid_data) * 8 > np.size(self.cov_data):
            raise Exception("Cover image is too small to embed data.Cannot fit %d bits in %d DCT coefficients" % (
                np.size(self.hid_data) * 8, np.size(self.cov_data)))
        return self.hid_data


    def _post_embed_actions(self, src_cover, src_hidden, tgt_stego):
        """
        This function isn't named very accurately. It actually calls the
        _raw_embed function in inherited classes.
        """
        try:
            cov_data = self._get_cov_data(src_cover)
            hid_data = self._get_hid_data(src_hidden)
            cov_data = self._raw_embed(cov_data, hid_data)

            self.cov_jpeg.setCoefBlocks(cov_data)
            self.cov_jpeg.Jwrite(tgt_stego)

            size = os.path.getsize(tgt_stego)
            size_embedded = np.size(hid_data)

            self._display_stats("embedded", size, size_embedded,
                                time.time() - self.t0)

        except TypeError as e:
            raise e
        except Exception:
            raise Exception(
                'DCT coefficients exhausted. This usually means there are not enough DCT coefficients in the image in which algorithm can actually embed data. You should choose a larger image.')

    def _post_extract_actions(self, src_steg, tgt_hidden):
        """
        This function isn't named very accurately. It actually calls the
        _raw_extract function in inherited classes.
        """
        try:
            steg_data = self._get_cov_data(src_steg)
            emb_size = os.path.getsize(src_steg)

            # recovering file size
            header_size = 4 * 8
            size_data = self._raw_extract(steg_data, header_size)
            size_data = bits2bytes(size_data)
            size_hd = 0
            for i in xrange(4):
                size_hd += size_data[i] * 256 ** i

            raw_size = size_hd * 8

            if raw_size > np.size(steg_data):
                raise Exception("Supposed secret data too large for stego image.")

            hid_data = self._raw_extract(steg_data, raw_size)
            hid_data = bits2bytes(hid_data)
            hid_data[4:].tofile(tgt_hidden)

            self._display_stats("extracted", emb_size,
                                np.size(hid_data),
                                time.time() - self.t0)
        except:
            raise Exception('DCT coefficients exhausted.The stego image is probably corrupted.')


    def _looks_like_jpeg(self, path):
        try:
            with open(path, 'r') as f:
                return f.read(2) == '\xff\xd8'
        except IOError:
            return False

    def _display_stats(self, verb, cov_size, emb_size, duration):
        print(
            "%dB %s in %.2fs (%.2f kBps), embedding ratio: %.4f" %
            (emb_size, verb, duration, (emb_size / duration) / 1000.,
             float(emb_size) / cov_size))

    # dummy functions to please pylint
    def _raw_embed(self, cov_data, hid_data, status_begin=0):
        pass

    def _raw_extract(self, steg_data, num_bits):
        pass

    def _dummy_embed_hook(self, cov_data, hid_data):
        pass

    def _dummy_extract_hook(self, steg_data, num_bits):
        pass