10 Commits

6 changed files with 267 additions and 12 deletions

View File

@@ -3,10 +3,11 @@
## Modules ## Modules
The `libWiiPy.archive` package contains modules for packing and extracting archive formats used by the Wii. This currently includes packing and unpacking support for U8 archives and decompression support for ASH archives. The `libWiiPy.archive` package contains modules for packing and extracting archive formats used by the Wii. This currently includes packing and unpacking support for U8 archives and decompression support for ASH archives.
| Module | Description | | Module | Description |
|--------------------------------------|---------------------------------------------------------| |----------------------------------------|---------------------------------------------------------|
| [libWiiPy.archive.ash](/archive/ash) | Provides support for decompressing ASH archives | | [libWiiPy.archive.ash](/archive/ash) | Provides support for decompressing ASH archives |
| [libWiiPy.archive.u8](/archive/u8) | Provides support for packing and extracting U8 archives | | [libWiiPy.archive.lz77](/archive/lz77) | Provides support for the LZ77 compression scheme |
| [libWiiPy.archive.u8](/archive/u8) | Provides support for packing and extracting U8 archives |
### libWiiPy.archive Package Contents ### libWiiPy.archive Package Contents
@@ -14,5 +15,6 @@ The `libWiiPy.archive` package contains modules for packing and extracting archi
:maxdepth: 4 :maxdepth: 4
/archive/ash /archive/ash
/archive/lz77
/archive/u8 /archive/u8
``` ```

View File

@@ -0,0 +1,12 @@
# libWiiPy.archive.lz77 Module
The `libWiiPy.archive.lz77` module provides support for handling LZ77 compression, which is a compression format used across the Wii and other Nintendo consoles.
## Module Contents
```{eval-rst}
.. automodule:: libWiiPy.archive.lz77
:members:
:undoc-members:
:show-inheritance:
```

View File

@@ -4,6 +4,155 @@
# See https://wiibrew.org/wiki/LZ77 for details about the LZ77 compression format. # See https://wiibrew.org/wiki/LZ77 for details about the LZ77 compression format.
import io import io
from dataclasses import dataclass as _dataclass
_LZ_MIN_DISTANCE = 0x01 # Minimum distance for each reference.
_LZ_MAX_DISTANCE = 0x1000 # Maximum distance for each reference.
_LZ_MIN_LENGTH = 0x03 # Minimum length for each reference.
_LZ_MAX_LENGTH = 0x12 # Maximum length for each reference.
@_dataclass
class _LZNode:
dist: int = 0
len: int = 0
weight: int = 0
def _compress_compare_bytes(byte1: bytes, offset1: int, byte2: bytes, offset2: int, abs_len_max: int) -> int:
# Compare bytes up to the maximum length we can match.
num_matched = 0
while num_matched < abs_len_max:
if byte1[offset1 + num_matched] != byte2[offset2 + num_matched]:
break
num_matched += 1
return num_matched
def _compress_search_matches(buffer: bytes, pos: int) -> (int, int):
bytes_left = len(buffer) - pos
global _LZ_MAX_DISTANCE, _LZ_MAX_LENGTH, _LZ_MIN_DISTANCE
# Default to only looking back 4096 bytes, unless we've moved fewer than 4096 bytes, in which case we should
# only look as far back as we've gone.
max_dist = min(_LZ_MAX_DISTANCE, pos)
# Default to only matching up to 18 bytes, unless fewer than 18 bytes remain, in which case we can only match
# up to that many bytes.
max_len = min(_LZ_MAX_LENGTH, bytes_left)
# Log the longest match we found and its offset.
biggest_match, biggest_match_pos = 0, 0
# Search for matches.
for i in range(_LZ_MIN_DISTANCE, max_dist + 1):
num_matched = _compress_compare_bytes(buffer, pos - i, buffer, pos, max_len)
if num_matched > biggest_match:
biggest_match = num_matched
biggest_match_pos = i
if biggest_match == max_len:
break
return biggest_match, biggest_match_pos
def _compress_node_is_ref(node: _LZNode) -> bool:
return node.len >= _LZ_MIN_LENGTH
def _compress_get_node_cost(length: int) -> int:
if length >= _LZ_MIN_LENGTH:
num_bytes = 2
else:
num_bytes = 1
return 1 + (num_bytes * 8)
def compress_lz77(data: bytes) -> bytes:
"""
Compresses data using the Wii's LZ77 compression algorithm and returns the compressed result.
Parameters
----------
data: bytes
The data to compress.
Returns
-------
bytes
The LZ77-compressed data.
"""
nodes = [_LZNode() for _ in range(len(data))]
# Iterate over the uncompressed data, starting from the end.
pos = len(data)
global _LZ_MAX_LENGTH, _LZ_MIN_LENGTH, _LZ_MIN_DISTANCE
while pos:
pos -= 1
node = nodes[pos]
# Limit the maximum search length when we're near the end of the file.
max_search_len = min(_LZ_MAX_LENGTH, len(data) - pos)
if max_search_len < _LZ_MIN_DISTANCE:
max_search_len = 1
# Initialize as 1 for each, since that's all we could use if we weren't compressing.
length, dist = 1, 1
if max_search_len >= _LZ_MIN_LENGTH:
length, dist = _compress_search_matches(data, pos)
# Treat as direct bytes if it's too short to copy.
if length == 0 or length < _LZ_MIN_LENGTH:
length = 1
# If the node goes to the end of the file, the weight is the cost of the node.
if (pos + length) == len(data):
node.len = length
node.dist = dist
node.weight = _compress_get_node_cost(length)
# Otherwise, search for possible matches and determine the one with the best cost.
else:
weight_best = 0xFFFFFFFF # This was originally UINT_MAX, but that isn't a thing here so 32-bit it is!
len_best = 1
while length:
weight_next = nodes[pos + length].weight
weight = _compress_get_node_cost(length) + weight_next
if weight < weight_best:
len_best = length
weight_best = weight
length -= 1
if length != 0 and length < _LZ_MIN_LENGTH:
length = 1
node.len = len_best
node.dist = dist
node.weight = weight_best
# Write the header data.
with io.BytesIO() as buffer:
# Write the header data.
buffer.write(b'LZ77\x10') # The LZ type on the Wii is *always* 0x10.
buffer.write(len(data).to_bytes(3, 'little'))
src_pos = 0
while src_pos < len(data):
head = 0
head_pos = buffer.tell()
buffer.write(b'\x00') # Reserve a byte for the chunk head.
i = 0
while i < 8 and src_pos < len(data):
current_node = nodes[src_pos]
length = current_node.len
dist = current_node.dist
# This is a reference node.
if _compress_node_is_ref(current_node):
encoded = (((length - _LZ_MIN_LENGTH) & 0xF) << 12) | ((dist - _LZ_MIN_DISTANCE) & 0xFFF)
buffer.write(encoded.to_bytes(2))
head = (head | (1 << (7 - i))) & 0xFF
# This is a direct copy node.
else:
buffer.write(data[src_pos:src_pos + 1])
src_pos += length
i += 1
pos = buffer.tell()
buffer.seek(head_pos)
buffer.write(head.to_bytes(1))
buffer.seek(pos)
buffer.seek(0)
out_data = buffer.read()
return out_data
def decompress_lz77(lz77_data: bytes) -> bytes: def decompress_lz77(lz77_data: bytes) -> bytes:

View File

@@ -88,7 +88,14 @@ def download_tmd(title_id: str, title_version: int = None, wiiu_endpoint: bool =
if title_version is not None: if title_version is not None:
tmd_url += "." + str(title_version) tmd_url += "." + str(title_version)
# Make the request. # Make the request.
tmd_request = requests.get(url=tmd_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True) try:
tmd_request = requests.get(url=tmd_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True)
except requests.exceptions.ConnectionError:
if endpoint_override:
raise ValueError("A connection could not be made to the NUS endpoint. Please make sure that your endpoint "
"override is valid.")
else:
raise Exception("A connection could not be made to the NUS endpoint. The NUS may be unavailable.")
# Handle a 404 if the TID/version doesn't exist. # Handle a 404 if the TID/version doesn't exist.
if tmd_request.status_code != 200: if tmd_request.status_code != 200:
raise ValueError("The requested Title ID or TMD version does not exist. Please check the Title ID and Title" raise ValueError("The requested Title ID or TMD version does not exist. Please check the Title ID and Title"
@@ -133,7 +140,14 @@ def download_ticket(title_id: str, wiiu_endpoint: bool = False, endpoint_overrid
endpoint_url = _nus_endpoint[0] endpoint_url = _nus_endpoint[0]
ticket_url = endpoint_url + title_id + "/cetk" ticket_url = endpoint_url + title_id + "/cetk"
# Make the request. # Make the request.
ticket_request = requests.get(url=ticket_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True) try:
ticket_request = requests.get(url=ticket_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True)
except requests.exceptions.ConnectionError:
if endpoint_override:
raise ValueError("A connection could not be made to the NUS endpoint. Please make sure that your endpoint "
"override is valid.")
else:
raise Exception("A connection could not be made to the NUS endpoint. The NUS may be unavailable.")
if ticket_request.status_code != 200: if ticket_request.status_code != 200:
raise ValueError("The requested Title ID does not exist, or refers to a non-free title. Tickets can only" raise ValueError("The requested Title ID does not exist, or refers to a non-free title. Tickets can only"
" be downloaded for titles that are free on the NUS.") " be downloaded for titles that are free on the NUS.")
@@ -173,8 +187,15 @@ def download_cert_chain(wiiu_endpoint: bool = False, endpoint_override: str = No
endpoint_url = _nus_endpoint[0] endpoint_url = _nus_endpoint[0]
tmd_url = endpoint_url + "0000000100000002/tmd.513" tmd_url = endpoint_url + "0000000100000002/tmd.513"
cetk_url = endpoint_url + "0000000100000002/cetk" cetk_url = endpoint_url + "0000000100000002/cetk"
tmd = requests.get(url=tmd_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True).content try:
cetk = requests.get(url=cetk_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True).content tmd = requests.get(url=tmd_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True).content
cetk = requests.get(url=cetk_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True).content
except requests.exceptions.ConnectionError:
if endpoint_override:
raise ValueError("A connection could not be made to the NUS endpoint. Please make sure that your endpoint "
"override is valid.")
else:
raise Exception("A connection could not be made to the NUS endpoint. The NUS may be unavailable.")
# Assemble the certificate chain. # Assemble the certificate chain.
cert_chain = b'' cert_chain = b''
# Certificate Authority data. # Certificate Authority data.
@@ -184,8 +205,9 @@ def download_cert_chain(wiiu_endpoint: bool = False, endpoint_override: str = No
# XS (Ticket certificate) data. # XS (Ticket certificate) data.
cert_chain += cetk[0x2A4:0x2A4 + 768] cert_chain += cetk[0x2A4:0x2A4 + 768]
# Since the cert chain is always the same, check the hash to make sure nothing went wildly wrong. # Since the cert chain is always the same, check the hash to make sure nothing went wildly wrong.
if hashlib.sha1(cert_chain).hexdigest() != "ace0f15d2a851c383fe4657afc3840d6ffe30ad0": # This is currently disabled because of the possibility that one may be downloading non-retail certs (gasp!).
raise Exception("An unknown error has occurred downloading and creating the certificate.") #if hashlib.sha1(cert_chain).hexdigest() != "ace0f15d2a851c383fe4657afc3840d6ffe30ad0":
# raise Exception("An unknown error has occurred downloading and creating the certificate.")
return cert_chain return cert_chain
@@ -224,7 +246,14 @@ def download_content(title_id: str, content_id: int, wiiu_endpoint: bool = False
endpoint_url = _nus_endpoint[0] endpoint_url = _nus_endpoint[0]
content_url = endpoint_url + title_id + "/000000" + content_id_hex content_url = endpoint_url + title_id + "/000000" + content_id_hex
# Make the request. # Make the request.
content_request = requests.get(url=content_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True) try:
content_request = requests.get(url=content_url, headers={'User-Agent': 'wii libnup/1.0'}, stream=True)
except requests.exceptions.ConnectionError:
if endpoint_override:
raise ValueError("A connection could not be made to the NUS endpoint. Please make sure that your endpoint "
"override is valid.")
else:
raise Exception("A connection could not be made to the NUS endpoint. The NUS may be unavailable.")
if content_request.status_code != 200: if content_request.status_code != 200:
raise ValueError("The requested Title ID does not exist, or an invalid Content ID is present in the" raise ValueError("The requested Title ID does not exist, or an invalid Content ID is present in the"
" content records provided.\n Failed while downloading Content ID: 000000" + " content records provided.\n Failed while downloading Content ID: 000000" +

View File

@@ -160,7 +160,8 @@ class Ticket:
limit_value = int.from_bytes(ticket_data.read(4)) limit_value = int.from_bytes(ticket_data.read(4))
self.title_limits_list.append(_TitleLimit(limit_type, limit_value)) self.title_limits_list.append(_TitleLimit(limit_type, limit_value))
# Check certs to see if this is a retail or dev ticket. Treats unknown certs as being retail for now. # Check certs to see if this is a retail or dev ticket. Treats unknown certs as being retail for now.
if self.signature_issuer.find("Root-CA00000002-XS00000006") != -1: if (self.signature_issuer.find("Root-CA00000002-XS00000006") != -1 or
self.signature_issuer.find("Root-CA00000002-XS00000004") != -1):
self.is_dev = True self.is_dev = True
else: else:
self.is_dev = False self.is_dev = False

View File

@@ -0,0 +1,62 @@
# "title/wiiload.py" from libWiiPy by NinjaCheetah & Contributors
# https://github.com/NinjaCheetah/libWiiPy
#
# This code is adapted from "wiiload.py", which can be found on the WiiBrew page for Wiiload.
# https://pastebin.com/4nWAkBpw
#
# See https://wiibrew.org/wiki/Wiiload for details about how Wiiload works
import sys
import zlib
import socket
import struct
def send_bin_wiiload(target_ip: str, bin_data: bytes, name: str) -> None:
"""
Sends an ELF or DOL binary to The Homebrew Channel via Wiiload. This requires the IP address of the console you
want to send the binary to.
Parameters
----------
target_ip: str
The IP address of the console to send the binary to.
bin_data: bytes
The data of the ELF or DOL to send.
name: str
The name of the application being sent.
"""
wii_ip = (target_ip, 4299)
WIILOAD_VERSION_MAJOR=0
WIILOAD_VERSION_MINOR=5
len_uncompressed = len(bin_data)
c_data = zlib.compress(bin_data, 6)
chunk_size = 1024*128
chunks = [c_data[i:i+chunk_size] for i in range(0, len(c_data), chunk_size)]
args = [name]
args = "\x00".join(args) + "\x00"
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(wii_ip)
s.send("HAXX")
s.send(struct.pack("B", WIILOAD_VERSION_MAJOR)) # one byte, unsigned
s.send(struct.pack("B", WIILOAD_VERSION_MINOR)) # one byte, unsigned
s.send(struct.pack(">H",len(args))) # bigendian, 2 bytes, unsigned
s.send(struct.pack(">L",len(c_data))) # bigendian, 4 bytes, unsigned
s.send(struct.pack(">L",len_uncompressed)) # bigendian, 4 bytes, unsigned
print(len(chunks),"chunks to send")
for piece in chunks:
s.send(piece)
sys.stdout.write("."); sys.stdout.flush()
sys.stdout.write("\n")
s.send(args)
s.close()
print("done")