mirror of
https://github.com/kenzok8/small-package.git
synced 2026-02-05 14:26:24 +08:00
update 2024-11-27 14:16:39
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
requests
|
||||
fake-useragent
|
||||
tqdm
|
||||
systemd-python
|
||||
fastapi
|
||||
uvicorn
|
||||
@@ -9,43 +9,43 @@ import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
|
||||
import requests
|
||||
from fake_useragent import UserAgent
|
||||
from tqdm import tqdm
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import Response
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from uvicorn import Config, Server
|
||||
|
||||
ua = UserAgent()
|
||||
|
||||
PORT = 37491
|
||||
|
||||
|
||||
class MyHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
app = FastAPI()
|
||||
|
||||
def do_GET(self):
|
||||
user_agent = self.headers.get('User-Agent')
|
||||
@app.get("/")
|
||||
async def root(request: Request):
|
||||
user_agent = request.headers.get("user-agent")
|
||||
|
||||
# assert user_agent only contains F
|
||||
if not all([c == 'F' for c in user_agent]):
|
||||
self.send_response(400)
|
||||
logging.error(f"Invalid User-Agent: {user_agent}")
|
||||
else:
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
ua_len = len(user_agent)
|
||||
self.wfile.write(str(ua_len).encode())
|
||||
if not all(c == 'F' for c in user_agent):
|
||||
return Response(status_code=400)
|
||||
|
||||
return Response(content=str(len(user_agent)).encode())
|
||||
|
||||
def start_server():
|
||||
with socketserver.TCPServer(('', PORT), MyHandler, bind_and_activate=False) as httpd:
|
||||
httpd.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
|
||||
httpd.server_bind()
|
||||
httpd.server_activate()
|
||||
print(f"Serving on port {PORT}")
|
||||
httpd.serve_forever()
|
||||
atexit.register(httpd.shutdown)
|
||||
|
||||
config4 = Config(app=app, host="127.0.0.1", port=PORT, access_log=False)
|
||||
config6 = Config(app=app, host="::1", port=PORT, access_log=False)
|
||||
server4 = Server(config4)
|
||||
server6 = Server(config6)
|
||||
t4 = threading.Thread(target=server4.run)
|
||||
t4.daemon = True
|
||||
t6 = threading.Thread(target=server6.run)
|
||||
t6.daemon = True
|
||||
t4.start()
|
||||
t6.start()
|
||||
|
||||
def start_ua2f(u: str):
|
||||
p = subprocess.Popen([u])
|
||||
@@ -73,11 +73,7 @@ if __name__ == "__main__":
|
||||
|
||||
setup_iptables()
|
||||
|
||||
server_thread = threading.Thread(target=start_server)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
|
||||
print(f"Starting server on port {PORT}")
|
||||
start_server()
|
||||
|
||||
ua2f_thread = threading.Thread(target=start_ua2f, args=(ua2f,))
|
||||
ua2f_thread.daemon = True
|
||||
@@ -87,7 +83,7 @@ if __name__ == "__main__":
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
for i in tqdm(range(2000)):
|
||||
for i in tqdm(range(1024)):
|
||||
nxt = ua.random
|
||||
response = requests.get(f"http://127.0.0.1:{PORT}", headers={
|
||||
"User-Agent": nxt
|
||||
@@ -95,7 +91,7 @@ if __name__ == "__main__":
|
||||
assert response.ok
|
||||
assert response.text == str(len(nxt))
|
||||
|
||||
for i in tqdm(range(2000)):
|
||||
for i in tqdm(range(4096)):
|
||||
nxt = ua.random
|
||||
response = requests.get(f"http://[::1]:{PORT}", headers={
|
||||
"User-Agent": nxt
|
||||
|
||||
Reference in New Issue
Block a user