Debugging Crypto config and temp directory creation
Attempting to get stub file creation and copying working. Fixed syntax errors, and now attempting to get password reading working in the test environment.
This commit is contained in:
parent
ad37727c6b
commit
b56bb83558
5 changed files with 111 additions and 51 deletions
|
@ -16,3 +16,7 @@ test-proxy:
|
|||
|
||||
test-comm:
|
||||
py.test -v -rw --twisted tests/test_comm.py
|
||||
|
||||
test-crypto:
|
||||
py.test -v -rw --twisted tests/test_crypto.py
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import glob
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
|
@ -175,7 +176,7 @@ class PappyConfig(object):
|
|||
self.crypt_dir = os.path.join(os.getcwd(), 'crypt')
|
||||
self.crypt_file = 'project.crypt'
|
||||
self.crypt_mode = None
|
||||
self.salt = os.urandom(16)
|
||||
self.salt = os.urandom(16)
|
||||
self.salt_file = 'project.salt'
|
||||
|
||||
def get_default_config(self):
|
||||
|
@ -186,12 +187,14 @@ class PappyConfig(object):
|
|||
return settings
|
||||
|
||||
def get_project_files(self):
|
||||
file_glob = glob.glob('*')
|
||||
pp = os.path.join(os.getcwd())
|
||||
project_files = [pp+f for f in file_glob if os.path.isfile(pp+f)]
|
||||
project_files.remove(self.salt_file)
|
||||
project_files.remove(self.crypt_file)
|
||||
return project_files
|
||||
file_glob = glob.glob('*')
|
||||
pp = os.path.join(os.getcwd())
|
||||
project_files = [pp+f for f in file_glob if os.path.isfile(pp+f)]
|
||||
if self.salt_file in project_files:
|
||||
project_files.remove(self.salt_file)
|
||||
if self.crypt_file in project_files:
|
||||
project_files.remove(self.crypt_file)
|
||||
return project_files
|
||||
|
||||
|
||||
@staticmethod
|
||||
|
|
|
@ -10,7 +10,7 @@ import twisted
|
|||
|
||||
from . import compress
|
||||
from base64 import b64encode, b64decode
|
||||
from cryptography import Fernet
|
||||
from cryptography.fernet import Fernet
|
||||
from twisted.internet import reactor, defer
|
||||
|
||||
class Crypto(object):
|
||||
|
@ -18,16 +18,20 @@ class Crypto(object):
|
|||
self.config = sessconfig
|
||||
self.archive = self.config.archive
|
||||
self.compressor = compress.Compress(sessconfig)
|
||||
self.key = None
|
||||
self.password = None
|
||||
self.salt = None
|
||||
|
||||
def encrypt_project(passwd):
|
||||
def encrypt_project(self):
|
||||
"""
|
||||
Compress and encrypt the project files, deleting clear-text files afterwards
|
||||
"""
|
||||
# Derive the key
|
||||
key = crypto_ramp_up(passwd)
|
||||
|
||||
# Get the password and salt, then derive the key
|
||||
self.crypto_ramp_up()
|
||||
|
||||
# Instantiate the crypto module
|
||||
fern = Fernet(key)
|
||||
fern = Fernet(self.key)
|
||||
|
||||
# Create project archive and crypto archive
|
||||
self.compressor.compress_project()
|
||||
|
@ -39,24 +43,26 @@ class Crypto(object):
|
|||
archive_crypt.write(crypt_token)
|
||||
|
||||
# Delete clear-text files
|
||||
delete_clear_files()
|
||||
# delete_clear_files()
|
||||
|
||||
# Leave crypto working directory
|
||||
os.chdir('../')
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def decrypt_project(passwd):
|
||||
def decrypt_project(self):
|
||||
"""
|
||||
Decrypt and decompress the project files
|
||||
"""
|
||||
|
||||
# Get the password and salt, then derive the key
|
||||
self.crypto_ramp_up()
|
||||
|
||||
# Create crypto working directory
|
||||
crypto_path = os.path.join(os.getcwd(), pappy_config.crypt_dir)
|
||||
crypto_path = os.path.join(os.getcwd(), self.config.crypt_dir)
|
||||
os.mkdir(crypto_path)
|
||||
|
||||
if os.path.isfile(self.config.crypt_file):
|
||||
# Derive the key
|
||||
key = crypto_ramp_up(passwd)
|
||||
key = self.crypto_ramp_up()
|
||||
fern = Fernet(key)
|
||||
|
||||
# Decrypt the project archive
|
||||
|
@ -71,18 +77,14 @@ class Crypto(object):
|
|||
for pf in project_files:
|
||||
shutil.copy2(pf, crypto_path)
|
||||
os.chdir(crypto_path)
|
||||
|
||||
|
||||
def crypto_ramp_up(self):
|
||||
if not self.password:
|
||||
self.get_password()
|
||||
self.set_salt()
|
||||
self.derive_key()
|
||||
|
||||
def crypto_ramp_up(passwd):
|
||||
salt = ""
|
||||
if os.path.isfile(self.config.salt_file):
|
||||
salt = get_salt()
|
||||
else:
|
||||
salt = create_salt_file()
|
||||
key = derive_key(passwd, salt)
|
||||
return key
|
||||
|
||||
def delete_clear_files():
|
||||
def delete_clear_files(self):
|
||||
"""
|
||||
Deletes all clear-text files left in the project directory.
|
||||
"""
|
||||
|
@ -90,7 +92,7 @@ class Crypto(object):
|
|||
for pf in project_files:
|
||||
os.remove(pf)
|
||||
|
||||
def delete_crypt_files():
|
||||
def delete_crypt_files(self):
|
||||
"""
|
||||
Deletes all encrypted-text files in the project directory.
|
||||
Forces generation of new salt after opening and closing the project.
|
||||
|
@ -99,22 +101,29 @@ class Crypto(object):
|
|||
os.remove(self.config.salt_file)
|
||||
os.remove(self.config.crypt_file)
|
||||
|
||||
def create_salt_file():
|
||||
self.config.salt = urandom(16)
|
||||
def create_salt_file(self):
|
||||
salt_file = open(self.config.salt_file, 'wb')
|
||||
|
||||
if not self.config.salt:
|
||||
self.set_salt()
|
||||
|
||||
salt_file.write(self.config.salt)
|
||||
salt_file.close()
|
||||
return salt
|
||||
|
||||
def get_salt():
|
||||
def set_salt_from_file(self):
|
||||
try:
|
||||
salt_file = open(self.config.salt_file, 'rb')
|
||||
salt = salt_file.readline()
|
||||
self.config.salt = salt_file.readline().strip()
|
||||
except:
|
||||
raise PappyException("Unable to read pappy.salt")
|
||||
return salt
|
||||
raise PappyException("Unable to read project.salt")
|
||||
|
||||
def set_salt(self):
|
||||
if os.path.isfile(self.config.salt_file):
|
||||
self.set_salt_from_file()
|
||||
else:
|
||||
self.config.salt = os.urandom(16)
|
||||
|
||||
def get_password():
|
||||
def get_password(self):
|
||||
"""
|
||||
Retrieve password from the user. Raise an exception if the
|
||||
password is not capable of utf-8 encoding.
|
||||
|
@ -122,12 +131,11 @@ class Crypto(object):
|
|||
encoded_passwd = ""
|
||||
try:
|
||||
passwd = raw_input("Enter a password: ")
|
||||
encode_passwd = passwd.encode("utf-8")
|
||||
self.password = passwd.encode("utf-8")
|
||||
except:
|
||||
raise PappyException("Invalid password, try again")
|
||||
return encoded_passwd
|
||||
|
||||
def derive_key(passwd, salt):
|
||||
def derive_key(self):
|
||||
"""
|
||||
Derive a key sufficient for use as a cryptographic key
|
||||
used to encrypt the project (currently: cryptography.Fernet).
|
||||
|
@ -156,9 +164,8 @@ class Crypto(object):
|
|||
- scrypt.error if scrypt failed
|
||||
"""
|
||||
|
||||
derived_key = ""
|
||||
try:
|
||||
dkey = scrypt.hash(passwd, salt, bufflen=32)
|
||||
if not self.key:
|
||||
self.key = scrypt.hash(self.password, self.salt, bufflen=32)
|
||||
except e:
|
||||
raise PappyException("Error deriving the key: ", e)
|
||||
return derived_key
|
||||
|
|
|
@ -64,7 +64,7 @@ class PappySession(object):
|
|||
self.dbpool = None
|
||||
self.delete_data_on_quit = False
|
||||
self.ports = None
|
||||
self.crypto = Crypto(sessconfig)
|
||||
self.crypto = crypto.Crypto(sessconfig)
|
||||
self.password = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -145,16 +145,11 @@ class PappySession(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def encrypt(self):
|
||||
if self.password:
|
||||
self.crypto.encrypt_project(self.password)
|
||||
else:
|
||||
self.password = self.crypto.get_password()
|
||||
self.crypto.encrypt_project(self.password)
|
||||
self.crypto.encrypt_project(self.password)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def decrypt(self):
|
||||
self.password = self.crypto.get_password()
|
||||
self.crypto.decrypt_project(self.password)
|
||||
self.crypto.decrypt_project()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def cleanup(self, ignored=None):
|
||||
|
|
51
pappyproxy/tests/test_crypto.py
Normal file
51
pappyproxy/tests/test_crypto.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
import os
|
||||
import pytest
|
||||
import random
|
||||
import string
|
||||
from pappyproxy.session import Session
|
||||
from pappyproxy.crypto import Crypto
|
||||
from pappyproxy.config import PappyConfig
|
||||
|
||||
@pytest.fixture
|
||||
def conf():
|
||||
c = PappyConfig()
|
||||
return c
|
||||
|
||||
@pytest.fixture
|
||||
def crypt():
|
||||
c = Crypto(conf())
|
||||
return c
|
||||
|
||||
@pytest.fixture
|
||||
def tmpname():
|
||||
cns = string.ascii_lowercase + string.ascii_uppercase + string.digits
|
||||
tn = ''
|
||||
for i in xrange(8):
|
||||
tn += cns[random.randint(0,len(cns)-1)]
|
||||
return tn
|
||||
|
||||
tmpdir = '/tmp/test_crypto'+tmpname()
|
||||
tmpfiles = ['cmdhistory', 'config.json', 'data.db']
|
||||
|
||||
def stub_files():
|
||||
enter_tmpdir()
|
||||
for sf in tmpfiles:
|
||||
with os.fdopen(os.open(sf, os.O_CREAT, 0o0600), 'r'):
|
||||
pass
|
||||
|
||||
def enter_tmpdir():
|
||||
if not os.path.isdir(tmpdir):
|
||||
os.mkdir(tmpdir)
|
||||
os.chdir(tmpdir)
|
||||
|
||||
def test_decrypt_tmpdir():
|
||||
enter_tmpdir()
|
||||
crypt().decrypt_project()
|
||||
assert os.path.isdir(os.path.join(os.getcwd(), '../crypt'))
|
||||
|
||||
def test_decrypt_copy_files():
|
||||
enter_tmpdir()
|
||||
stub_files()
|
||||
crypt().decrypt_project()
|
||||
for tf in tmpfiles:
|
||||
assert os.path.isfile(tf)
|
Loading…
Add table
Add a link
Reference in a new issue