This commit is contained in:
osiris account 2023-03-12 15:29:57 -07:00
parent 7cb7a479f6
commit 69bb4175f1
124 changed files with 20 additions and 15 deletions

View file

@ -0,0 +1,26 @@
#!/usr/bin/env python
import argparse
def main():
description = 'Describe what your app does here'
# Run CLI menu.
parser = argparse.ArgumentParser(description=description)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-e', '--encode', type=int, help="some help here")
group.add_argument('-d', '--decode', help="another help here")
args = parser.parse_args()
print(args.encode)
print(args.decode)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,24 @@
.PHONY: clean-pyc clean-dit clean test dist version
default: test
clean-pyc:
@find . -iname '*.py[co]' -delete
@find . -iname '__pycache__' -delete
clean-dist:
@rm -rf dist/
@rm -rf build/
@rm -rf *.egg-info
clean: clean-pyc clean-dist
test:
pytest -vvv
dist: clean
python3 setup.py sdist
python3 setup.py bdist_wheel
version: dist
python3 setup.py --version

View file

@ -0,0 +1,41 @@
# Click / CLI Boilerplate
A quick boilerplate when creating a new CLI apps/scripts in Python3.
### Install
Create and source a virtual environment:
```
virtualenv venv
source venv/bin/activate
```
Install dependencies:
```
pip install -r requirements.txt
```
Install package:
```
pip install .
```
Test your install with:
```
yourapp --help
```
---
### Development
Create a virtual environment and install the package with an editable option:
```
pip install --editable .
```

View file

@ -0,0 +1,5 @@
[pytest]
norecursedirs = venv* .*
addopts =
-r fEsxXw
-vvv

View file

@ -0,0 +1 @@
Click

View file

@ -0,0 +1,16 @@
from setuptools import setup, find_packages
setup(
name='yourapp',
version='0.0.1',
packages=find_packages(),
include_package_data=True,
author='steinkirch',
install_requires=[
'Click',
],
entry_points='''
[console_scripts]
src=src.main:main
''',
)

View file

@ -0,0 +1,32 @@
#!/usr/bin/env python3
import click
@click.command()
@click.option('-s',
'--source',
default='dev',
nargs=1,
show_default=True,
help='Some source string.')
@click.option('-t',
'--target',
default='staging',
nargs=1,
show_default=True,
help='Some target string.')
@click.option('-p',
'--services',
required=True)
def main(source, target, services):
print(source, target, services)
if __name__ == "__main__":
main(source, target, services)

View file

@ -0,0 +1,40 @@
## Concurrency and Parallelism in Python
<br>
* [Read a detailed explanation on threads and multiprocessing in Python in my book](https://github.com/go-outside-labs/algorithms-book)
<br>
### Threading
* Threading is a feature usually provided by the operating system.
* Threads are lighter than processes, and share the same memory space.
* With threading, concurrency is achieved using multiple threads, but due to the GIL only one thread can be running at a time.
* If your code is IO-heavy (like HTTP requests), then multithreading will still probably speed up your code.
### Multi-processing
* In multiprocessing, the original process is forked process into multiple child processes bypassing the GIL.
* Each child process will have a copy of the entire program's memory.
* If your code is performing a CPU bound task, such as decompressing gzip files, using the threading module will result in a slower execution time. For CPU bound tasks and truly parallel execution, use the multiprocessing module.
* Higher memory overhead than threading.
### RQ: queueing jobs
* [RQ](https://python-rq.org/) is aimple but powerful library.
* You first enqueue a function and its arguments using the library. This pickles the function call representation, which is then appended to a Redis list.
### Celery: queueing jobs
* Celery is one of the most popular background job managers in the Python world.
* Compatible with several message brokers like RabbitMQ or Redis and can act as both producer and consumer.
* Asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operations but supports scheduling as well.
### concurrent.futures
* Using a concurrent.futures.ThreadPoolExecutor makes the Python threading example code almost identical to the multiprocessing module.

View file

@ -0,0 +1,14 @@
#!/usr/bin/env python3
import asyncio
async def delayed_hello():
print('Hello ')
await asyncio.sleep(1)
print('World!')
loop = asyncio.get_event_loop()
loop.run_until_complete(delayed_hello())
loop.close()

View file

@ -0,0 +1,19 @@
#!/usr/bin/env python3
from time import sleep
from concurrent.futures import ThreadPoolExecutor
def return_after_5_secs(message):
sleep(5)
return message
pool = ThreadPoolExecutor(3)
future = pool.submit(return_after_5_secs, ('Future message'))
print(future.done())
sleep(5)
print(future.done())
print(future.result())

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
import time
import sys
import multiprocessing
def daemon():
p = multiprocessing.current_process()
print('Starting: {}, {}'.format(p.name, p.pid))
sys.stdout.flush()
time.sleep(1)
print('Exiting : {}, {}'.format(p.name, p.pid))
sys.stdout.flush()
def non_daemon():
p = multiprocessing.current_process()
print('Starting: {}, {}'.format(p.name, p.pid))
sys.stdout.flush()
print('Exiting : {}, {}'.format(p.name, p.pid))
sys.stdout.flush()
if __name__ == '__main__':
d = multiprocessing.Process(name='daemon', target=daemon)
d.daemon = True
n = multiprocessing.Process(name='non-daemon', target=non_daemon)
n.daemon = False
d.start()
time.sleep(1)
n.start()

View file

@ -0,0 +1,12 @@
#!/usr/bin/env python3
import threading
l = threading.Lock()
print("Before first lock acquire.")
l.acquire()
print("Before second lock acquire.")
l.acquire()
print("Lock was acquired twice")

View file

@ -0,0 +1,16 @@
#!/usr/bin/env python3
import sys
import logging
import multiprocessing
def worker():
print('Doing some work...')
sys.stdout.flush()
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker)
p.start()
p.join()

View file

@ -0,0 +1,14 @@
import time
import random
import multiprocessing
def worker(n):
sleep = random.randrange(1, 10)
time.sleep(sleep)
print("Worker {}: sleeping for {} seconds.".format(n, sleep))
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
p.start()

View file

@ -0,0 +1,11 @@
#!/usr/bin/env python3
from multiprocessing import Pool
def f(x):
return x*x
p = Pool(5)
print(p.map(f, [1, 2, 3]))

View file

@ -0,0 +1,27 @@
#!/usr/bin/env python3
import threading
x = 0
COUNT = 10000000
def foo():
global x
for i in range(COUNT):
x += 1
def bar():
global x
for i in range(COUNT):
x -= 1
t1 = threading.Thread(target=foo)
t2 = threading.Thread(target=bar)
t1.start()
t2.start()
t1.join()
t2.join()
print(x)

View file

@ -0,0 +1,16 @@
#!/usr/bin/env python3
import time
import random
import threading
def worker(n):
sleep = random.randrange(1, 10)
time.sleep(sleep)
print("Worker {} from {}: sleeping for {} seconds.".format(n, threading.get_ident(), sleep))
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
t.start()

View file

@ -0,0 +1,18 @@
#!/usr/bin/env python3
from time import sleep
from concurrent.futures import ThreadPoolExecutor
def return_after_5_secs(message):
sleep(5)
return message
pool = ThreadPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())

View file

@ -0,0 +1,19 @@
from queue import Queue
from threading import Thread
NUM_WORKERS = 4
task_queue = Queue()
def worker():
while True:
address = task_queue.get()
run_function(address)
task_queue.task_done()
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)]
[task_queue.put(item) for item in threads]
[thread.start() for thread in threads]
task_queue.join()

View file

@ -0,0 +1,4 @@
# Env constants to be used in the program
## General constants
PORT=8051

View file

@ -0,0 +1,26 @@
.PHONY: default clean run install test version dist
default: run
clean:
@find . -iname '*.py[co]' -delete
@find . -iname '__pycache__' -delete
@rm -rf dist/
@rm -rf build/
@rm -rf *.egg-info
run:
python3 app.py
install:
pip3 install -r requirements.txt
test:
pytest -vvv
dist: clean
python3 setup.py sdist
python3 setup.py bdist_wheel
version: dist
python3 setup.py --version

View file

@ -0,0 +1 @@
web: gunicorn app:server --workers 4

View file

@ -0,0 +1,63 @@
# Infrastructure Dashboards
This repository contains the source code for the infrastructure dashboards developed with [plot.ly and dash](https://dash.plot.ly/).
### Why Plotly
Plotly allows you to make beautiful and interactive dashboards in just a few lines of code, with data virtually any source that has a Python API.
### How do the Infrastructure Dashboards work?
Plotly objects consist of one or more data components and a layout component. Both have subcomponents. Most, but not all, of the formatting is controlled in the layout.
This app is divided into the following resources:
* `wrappers/`: where the API wrappers, `style.py` and `settings.py` live.
* `.env`: where all the constants and keys/secrets are set.
* `app.py`: entry point for the dashboard app: where the layout elements and the callback functions are set.
-----
## Running locally in dev mode
### Setting up
Add an `.env` file:
```
cp .env_example .env
```
Create an virtual environment and install dependencies:
```
virtualenv venv
source venv/bin/activate
```
### Installing
```
make install
```
### Running
Run server at localhost:
```
make run
```
The dahsboard should be available at `http://127.0.0.1:8051/` (note that the port is set in `.env`).
-------------
## Learning Resources
* [Build Your own Data Dashboard](https://towardsdatascience.com/build-your-own-data-dashboard-93e4848a0dcf).
* [A Python Programmers Guide to Dashboarding](https://medium.com/@drimik99/a-python-programmers-guide-to-dashboarding-part-1-8db0c48eee9d).
* [Interactive Python Dashboards with Plotly and Dash](https://www.udemy.com/course/interactive-python-dashboards-with-plotly-and-dash).
* [Make Your Data Visualizations Interactive with Plotly](https://towardsdatascience.com/its-2019-make-your-data-visualizations-interactive-with-plotly-b361e7d45dc6).

70
boilerplates/dash/app.py Normal file
View file

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
import os
import dash
import plotly
import pandas as pd
import dash_table as dt
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State
# -----------------------------
# ------ Instantiate app ------
# -----------------------------
app = dash.Dash(__name__)
# -------------------------------
# ---- Start app HTML layout ----
# -------------------------------
app.layout = html.Div(children=[
# Main Title
html.Div([
html.H1(s.title_h1,
style=s.style_title_h1
)
],
),
# Tabs
dcc.Tabs([
dcc.Tab(label='tab1', children=[
html.Div([
html.H2('title 1',
style=s.style_title_h2
)
],
),
]),
dcc.Tab(label='tab2', children=[
html.Div([
html.H2('tab 2',
style=s.style_title_h2
)
],
),
]),
], style=s.style_all)
# ---------------------------------
# ----- App Callback methods ------
# ---------------------------------
# ---------------------------------
# ----------- Run web app ---------
# ---------------------------------
if __name__ == '__main__':
app.scripts.config.serve_locally = True
app.css.config.serve_locally = True
app.run_server(debug=True, port=se.PORT)

View file

@ -0,0 +1,31 @@
certifi==2022.12.7
chardet==3.0.4
click==6.7
dash==0.34.0
dash-core-components==0.27.1
dash-html-components==0.11.0
dash-renderer==0.13.0
decorator==4.3.0
Flask==1.0.2
Flask-Compress==1.4.0
gunicorn==19.9.0
idna==2.7
ipython-genutils==0.2.0
itsdangerous==0.24
Jinja2==2.11.3
jsonschema==2.6.0
jupyter-core==4.11.2
MarkupSafe==1.0
nbformat==4.4.0
plotly==3.1.1
pytz==2018.5
requests==2.20.0
retrying==1.3.3
six==1.11.0
traitlets==4.3.2
urllib3==1.26.5
Werkzeug==2.2.3
numpy==1.22.0
pandas==0.25.3
dash-table-experiments==0.6.0
python-dotenv==0.10.3

View file

@ -0,0 +1,12 @@
# Load all env variables from .env file
import os
from dotenv import load_dotenv
from pathlib import Path
env_path = Path('.') / '.env'
load_dotenv(dotenv_path=env_path)
# General constants
PORT = os.getenv('PORT')

View file

@ -0,0 +1,45 @@
# -*- coding: utf8 -*-
# Define dashboard styling
# -----------------------------
# --- Titles and strings ------
# -----------------------------
title_h1 = 'Big Title'
# -----------------------------
# --- elements ids ------------
# -----------------------------
# -----------------------------
# --- CSS style dictionaries --
# -----------------------------
color_palette = ['#052a4e', # dark blue
'#fd8283', # light red
'#e4faff'] # light blue
style_all = {'margin':'auto',
'padding':20,
'width':'95%',
'fontFamily':'helvetica',
'fontSize':'14',
'color':color_palette[0],
'background':color_palette[2],
}
style_title_h1 = {'textAlign':'left',
'color':color_palette[1],
'font-weight':'bold',
'fontSize':'32',
'text-transform':'uppercase',
'padding':20,
}
style_title_h2 = {'textAlign':'center',
'font-weight':'bold',
'text-transform':'uppercase',
'fontSize':'20',
}

View file

@ -0,0 +1,65 @@
## Notes on optimization in Python
<br>
### Peephole optimization
* optimization technique done at the compile time to improve code performance
* code is optimized behind the scenes and it's done either by pre-calculating constant expressions or using membership tests
* turn mutable constructs into immutable constructes
* turn both set and lists into constants
<br>
### Intern strings
* string objects in Python are sequences of Unicode characters, called text sequences
* when characters of different sizes are added to a string, its total size and weight increase, but not only by the size of the added character. Python also allocates extra information to store strings, which causes them to take up too much space.
* The idea behund string interning is to cache certain strings in memory as they are created, which has lot in common with shared objects. For instance, CPython loads shared objects into memory every time a Python interactive session is initialized.
<br>
### Profiling code
* use timeit
* use cProfile: advanced profiling, generates reports
<br>
### Use generator
- Generators dont' return ites such as iterators, but only one item at time.
<br>
### Use "C" equivalent of some Python libs
* they are the same features but with faster performance. Example: instead of Pickle, use cPickle, etc.
* Use PyPy package and Cytjon to optimiza a static compiler.
<br>
### Avoid using Globals
* Ugh for spaghetti code...
* Or make a local copy before using them inside loops.
<br>
### Use technology stacks
* redis for cache
* rabbitmq or celert for job queues and exports

View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# Wrapper for K8s API
import os
from kubernetes import client, config
from wrappers.settings import ORG, K8S_ENV_LIST
def get_pod_data(namespace):
''' Retrieve and return data from K8s from a given namepasce.'''
config.load_kube_config()
api_instance = client.CoreV1Api()
try:
ret = api_instance.list_pod_for_all_namespaces(watch=False)
ip_list, name_list, stat_list = [], [], []
for item in ret.items:
if (item.metadata.namespace != namespace):
continue
ip_list.append(item.status.pod_ip)
name_list.append(item.metadata.name)
stat_list.append(item.status.phase)
dtag_list, dimage_list = [], []
for pod in name_list:
dtag_str, dimage_str = get_pod_info(namespace, api_instance, pod)
dtag_list.append(dtag_str)
dimage_list.append(dimage_str)
return {
'Status': stat_list,
'Name': name_list,
'Pod IP': ip_list,
'Docker Image': dimage_list,
'Docker Tag': dtag_list
}
except Exception as e:
print("Error retrieving K8s data: \n{}".format(e))
return None
def generate_env_dict():
''' Retrieve K8s namespaces from env, and return a dict of it. '''
env_list = K8S_ENV_LIST.split(',')
return [{'label': i.strip(), 'value': i.strip()} for i in env_list]
def get_pod_info(namespace, api_instance, pod_name):
''' Retrieve the Docker info from a given pod. '''
api_response = api_instance.read_namespaced_pod(pod_name, namespace)
dtag_str, dimage_str = '', ''
if len(api_response.spec.containers) < 2:
dimage_str, dtag_str = api_response.spec.containers[0].image.split(':')
else:
for container in api_response.spec.containers:
im, tg = container.image.split(':')
dimage_str += '{}; '.format(im)
dtag_str += '{}; '.format(tg)
return dtag_str, dimage_str
if __name__ == '__main__':
test_namespace = 'staging'
print('Printing k8s data for giving namespace')
print(get_pod_data(test_namespace))

View file

@ -0,0 +1,17 @@
import ctypes
class ICMP(ctypes.Structure):
_fields_ = [
('type', ctypes.c_ubyte),
('code', ctypes.c_ubyte),
('checksum', ctypes.c_ushort),
('unused', ctypes.c_ushort),
('next_hop_mtu',ctypes.c_ushort)
]
def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer):
pass

View file

@ -0,0 +1,44 @@
import socket
import os
import struct
import ctypes
from ICMPHeader import ICMP
# host to listen on
HOST = '192.168.1.114'
def main():
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind(( HOST, 0 ))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
while True:
raw_buffer = sniffer.recvfrom(65565)[0]
ip_header = raw_buffer[0:20]
iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
# Create our IP structure
version_ihl = iph[0]
version = version_ihl >> 4
ihl = version_ihl & 0xF
iph_length = ihl * 4
ttl = iph[5]
protocol = iph[6]
s_addr = socket.inet_ntoa(iph[8]);
d_addr = socket.inet_ntoa(iph[9]);
print('IP -> Version:' + str(version) + ', Header Length:' + str(ihl) + \
', TTL:' + str(ttl) + ', Protocol:' + str(protocol) + ', Source:'\
+ str(s_addr) + ', Destination:' + str(d_addr))
# Create our ICMP structure
buf = raw_buffer[iph_length:iph_length + ctypes.sizeof(ICMP)]
icmp_header = ICMP(buf)
print('ICMP -> Type:{0}, Code:{1}'.format((icmp_header.type, icmp_header.code)))
if __name__ == '__main__':
main()

View file

@ -0,0 +1,33 @@
import os
import socket
# host to listen
HOST = '192.168.1.114'
def sniffing(host, win, socket_prot):
while True:
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_prot)
sniffer.bind((host, 0))
# include the IP headers in the captured packets
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if win == 1:
sniffer.ioctl(socket.SIO_RCVALL, socket_RCVALL_ON)
# read in a single packet
print(sniffer.recvfrom(65565))
def main(host):
if os.name == 'nt':
sniffing(host, 1, socket.IPPROTO_IP)
else:
sniffing(host, 0, socket.IPPROTO_ICMP)
if __name__ == '__main__':
main(HOST)

View file

@ -0,0 +1,69 @@
import os
import time
import socket
import struct
import ctypes
import threading
from netaddr import IPNetwork, IPAddress
from ICMPHeader import ICMP
# host to listen on
HOST = '192.168.1.114'
# subnet to target (iterates through all IP address in this subnet)
SUBNET = '192.168.1.0/24'
# string signature
MESSAGE = 'hellooooo'
def udp_sender(SUBNET, MESSAGE):
''' Sprays out the udp datagram'''
time.sleep(5)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for ip in IPNetwork(SUBNET):
try:
sender.sendto(MESSAGE, (str(ip), 65212))
except:
pass
def main():
t = threading.Thread(target=udp_sender, args=(SUBNET, MESSAGE))
t.start()
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind(( HOST, 0 ))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
# continually read in packets and parse their information
while True:
raw_buffer = sniffer.recvfrom(65565)[0]
ip_header = raw_buffer[0:20]
iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
# Create our IP structure
version_ihl = iph[0]
ihl = version_ihl & 0xF
iph_length = ihl * 4
src_addr = socket.inet_ntoa(iph[8]);
# Create our ICMP structure
buf = raw_buffer[iph_length:iph_length + ctypes.sizeof(ICMP)]
icmp_header = ICMP(buf)
# check for the type 3 and code and within our target subnet
if icmp_header.code == 3 and icmp_header.type == 3:
if IPAddress(src_addr) in IPNetwork(SUBNET):
if raw_buffer[len(raw_buffer) - len(MESSAGE):] == MESSAGE:
print(f'Host up: {src_addr}')
if __name__ == '__main__':
main()

View file

@ -0,0 +1,6 @@
import socket
HOST = 'www.github.com'
PORT = 80
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((HOST, PORT))

View file

@ -0,0 +1,26 @@
import socket
PORT = 12345
HOSTNAME = '54.209.5.48'
def netcat(text_to_send):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOSTNAME, PORT))
s.sendall(text_to_send)
s.shutdown(socket.SHUT_WR)
rec_data = []
while 1:
data = s.recv(1024)
if not data:
break
rec_data.append(data)
s.close()
return rec_data
if __name__ == '__main__':
text_to_send = ''
text_recved = netcat( text_to_send)
print(text_recved[1])

View file

@ -0,0 +1,11 @@
DATA = 'GET / HTTP/1.1\r\nHost: google.com\r\n\r\n'
def tcp_client():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((HOST, PORT))
client.send(DATA)
response = client.recv(4096)
print(response)
if __name__ == '__main__':
tcp_client()

View file

@ -0,0 +1,33 @@
import socket
import threading
BIND_IP = '0.0.0.0'
BIND_PORT = 9090
def handle_client(client_socket):
request = client_socket.recv(1024)
print(f'[*] Received: {request}')
client_socket.send('ACK')
client_socket.close()
def tcp_server():
server = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
server.bind(( BIND_IP, BIND_PORT))
server.listen(5)
print(f'[*] Listening on {BIND_IP}, {BIND_PORT}')
while 1:
client, addr = server.accept()
print(f'[*] Accepted connection: {addr[0]}:{addr[1]}')
client_handler = threading.Thread(target=handle_client, args= (client,))
client_handler.start()
if __name__ == '__main__':
tcp_server()

View file

@ -0,0 +1,16 @@
import socket
HOST = '127.0.0.1'
PORT = 9000
DATA = 'AAAAAAAAAA'
def udp_client():
client = socket.socket( socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(DATA, ( HOST, PORT ))
data, addr = client.recvfrom(4096)
print(data, adr)
if __name__ == '__main__':
udp_client()

View file

@ -0,0 +1,18 @@
import socket
BIND_IP = '0.0.0.0'
BIND_PORT = 9000
def udp_server():
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(( BIND_IP, BIND_PORT))
print(f'Waiting on port: {str(BIND_PORT)}')
while 1:
data, addr = server.recvfrom(1024)
print(data)
if __name__ == '__main__':
udp_server()

View file

@ -0,0 +1,63 @@
import getopt
import paramiko
import socket
import threading
def main():
if not len(sys.argv[1:]):
print('Usage: ssh_server.py <SERVER> <PORT>')
return
# Create a socket object.
server = sys.argv[1]
ssh_port = int(sys.argv[2])
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((server, ssh_port))
sock.listen(100)
print('[+] Listening for connection ...')
client, addr = sock.accept()
except Exception, e:
print(f'[-] Connection Failed: {str(e)}')
return
print('[+] Connection Established!')
# Creating a paramiko object.
try:
Session = paramiko.Transport(client)
Session.add_server_key(HOST_KEY)
paramiko.util.log_to_file('filename.log')
server = Server()
try:
Session.start_server(server=server)
except paramiko.SSHException, x:
print('[-] SSH negotiation failed.')
return
chan = Session.accept(10)
print('[+] Authenticated!')
chan.send('Welcome to Buffy's SSH')
while 1:
try:
command = raw_input('Enter command: ').strip('\n')
if command != 'exit':
chan.send(command)
print chan.recv(1024) + '\n'
else:
chan.send('exit')
print('[*] Exiting ...')
session.close()
raise Exception('exit')
except KeyboardInterrupt:
session.close()
except Exception, e:
print(f'[-] Caught exception: {str(e)}')
try:
session.close()
except:
pass
if __name__ == '__main__':
main()

View file

@ -0,0 +1,17 @@
HOST_KEY = paramiko.RSAKey(filename='test_rsa.key')
USERNAME = 'buffy'
PASSWORD = 'killvampires'
class Server(paramiko.ServerInterface):
def __init__(self):
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_password(self, username, password):
if (username == USERNAME) and (password == PASSWORD):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED

View file

@ -0,0 +1,16 @@
def ssh_client(ip, port, user, passwd):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, port=port, username=user, password=passwd)
ssh_session = client.get_transport().open_session()
if ssh_session.active:
print(ssh_session.recv(1024))
while 1:
command = ssh_session.recv(1024)
try:
cmd_output = subprocess.check_output(command, shell=True)
ssh_session.send(cmd_output)
except Exception, e:
ssh_session.send(str(e))
client.close()

View file

@ -0,0 +1,59 @@
import paramiko
import sys
import getopt
def main():
if not len(sys.argv[1:]):
usage()
IP = '0.0.0.0'
USER = ''
PASSWORD = ''
KEY = ''
COMMAND = ''
PORT = 0
try:
opts = getopt.getopt(sys.argv[2:],"p:u:a:i:c:", \
['PORT', 'USER', 'PASSWORD', 'KEY', 'COMMAND'])[0]
except getopt.GetoptError as err:
print str(err)
usage()
IP = sys.argv[1]
print(f'[*] Initializing connection to {IP}')
# Handle the options and arguments.
# TODO: add KeyError error handler.
for t in opts:
if t[0] in ('-a'):
PASSWORD = t[1]
elif t[0] in ('-i'):
KEY = t[1]
elif t[0] in ('-c'):
COMMAND = t[1]
elif t[0] in ('-p'):
PORT = int(t[1])
elif t[0] in ('-u'):
USER = t[1]
else:
print('This option does not exist!')
usage()
if USER:
print(f'[*] User set to {USER}')
if PORT:
print(f'[*] The port to be used is PORT}')
if PASSWORD:
print(f'[*] Password length {len(PASSWORD)} was submitted.')
if KEY:
print(f'[*] The key at {KEY} will be used.')
if COMMAND:
print(f'[*] Executing the command {COMMAND} in the host...')
else:
print('You need to specify the command to the host.')
usage()
# Start the client.
ssh_client(IP, PORT, USER, PASSWORD, KEY, COMMAND)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,28 @@
.PHONY: setup install clean test lint
default: test
setup:
pip install -r requirements.txt
install:
python setup.py install
clean:
@find . -type f -name '*.pyc' -delete
@find . -type d -name '__pycache__' | xargs rm -rf
@find . -type d -name '*.ropeproject' | xargs rm -rf
@rm -rf build/
@rm -rf dist/
@rm -rf venv/
@rm -f src/*.egg*
@rm -f MANIFEST
@rm -rf docs/build/
@rm -f .coverage.*
test:
@tox -- -s
lint:
@tox -e lint

View file

@ -0,0 +1,8 @@
## testing in python
<br>
### resources
* [good example of unit tests suite I wrote](https://github.com/go-outside-labs/aws-pipeline/tree/master/tests)

View file

View file

@ -0,0 +1,15 @@
from setuptools import setup, find_packages
setup(
name='testing_app_name',
version='0.0.1',
packages=find_packages(),
include_package_data=True,
author='steinkirch',
install_requires=[
],
entry_points='''
[console_scripts]
testing_app_name=src.main:main
''',
)

View file

@ -0,0 +1,11 @@
#!/usr/bin/env python
import unittest
class AppNameTest(unittest.TestCase):
def setUp(self):
passd
def test_something(self):
pass

View file

@ -0,0 +1,47 @@
[tox]
envlist =
lint,py27,py35,py36,py37,py38,pypy,pypy3,docs,coverage-report,flake8,spelling
[testenv:coverage-report]
basepython = python3.7
skip_install = true
deps = coverage[toml]>=5.0.2
commands =
coverage combine
coverage report
[pipupgrade]
commands =
{envpython} -m pip install --upgrade pip
[testenv:docs]
changedir = {toxinidir}/docs
commands =
{[pipupgrade]commands}
sphinx-build -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
{[cleanup]commands}
deps =
sphinx
sphinx_rtd_theme
[cleanup]
commands =
find {toxinidir}/tests -type f -name "*.pyc" -delete
find {toxinidir}/tests -type d -name "__pycache__" -delete
find {toxinidir}/src -type f -name "*.pyc" -delete
find {toxinidir}/src -type d -name "__pycache__" -delete
find {toxinidir}/src -type f -path "*.egg-info*" -delete
find {toxinidir}/src -type d -path "*.egg-info" -delete
[testenv:flake8]
changedir = {toxinidir}
deps = flake8
commands =
{[pipupgrade]commands}
flake8 {toxinidir}/src/django_registration {toxinidir}/tests
{[cleanup]commands}