spine.py
· 8.4 KiB · Python
Неформатований
import json
import sys
import os
import requests
import asyncio
import aiohttp
from uuid_decoder import decode_uuid
def parse_spine_material_config(config_json, base_url="", file_name="file"):
"""
Parse the SpineMaterial config.json file to extract and decode UUID information
Args:
config_json (str): JSON string containing the SpineMaterial configuration
base_url (str): Base URL for generating asset URLs
Returns:
dict: A dictionary containing the parsed and organized configuration data
"""
try:
config = config_json
decoded_data = {
"materials": [],
"paths": config.get("paths", {}),
"types": config.get("types", []),
"packs": config.get("packs", {}),
"name": config.get("name", ""),
"decoded_uuids": []
}
uuids = config.get("uuids", [])
print(uuids)
for uuid in uuids:
decoded = decode_uuid(uuid)
decoded_data["decoded_uuids"].append({
"original": uuid,
"decoded": decoded
})
native_map = {}
versions_native = config.get("versions", {}).get("native", [])
for idx in range(0, len(versions_native), 2):
if idx + 1 < len(versions_native):
native_map[config["uuids"][versions_native[idx]]] = versions_native[idx + 1]
import_map = {}
versions_import = config.get("versions", {}).get("import", [])
for idx in range(0, len(versions_import), 2):
if idx + 1 < len(versions_import):
if isinstance(versions_import[idx], int):
if versions_import[idx] < len(uuids):
import_map[uuids[versions_import[idx]]] = versions_import[idx + 1]
else:
import_map[versions_import[idx]] = versions_import[idx + 1]
decoded_data["native_map"] = native_map
decoded_data["import_map"] = import_map
for path_index, path_info in config.get("paths", {}).items():
index = int(path_index)
material_name = path_info[0]
# if 'Avatar' not in material_name :
# continue
material_type_index = path_info[1]
material_type = config.get("types", [])[material_type_index] if material_type_index < len(config.get("types", [])) else "unknown"
print(material_type)
uuid = uuids[index] if index < len(uuids) else "unknown"
decoded_uuid = decode_uuid(uuid)
url = ""
extension = "unknown"
if base_url and uuid in native_map:
first_two = decoded_uuid[:2]
url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}"
if material_type == "sp.SkeletonData":
url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}.bin"
extension = "bin"
elif material_type == "cc.Asset":
url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}.atlas"
extension = "atlas"
elif material_type == "cc.Texture2D":
url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}.png"
extension = "png"
else:
continue
if not url:
continue
decoded_data["materials"].append({
"path": material_name,
"name": material_name.split("Avatar/")[-1] + "." + extension,
"type": material_type,
"uuid": uuid,
"hash": native_map[uuid],
"decoded_uuid": decoded_uuid,
"url": url,
"extension": extension
})
return decoded_data["materials"]
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return None
except Exception as e:
print(f"Error processing config: {e}")
return None
async def download_file(session, url, target_path):
"""Download a single file asynchronously"""
try:
async with session.get(url) as response:
if response.status == 200:
with open(target_path, 'wb') as f:
f.write(await response.read())
print(f"Successfully downloaded {os.path.basename(target_path)}")
return True
else:
print(f"Failed to download {url}, status code: {response.status}")
return False
except Exception as e:
# print(f"Error downloading {url}: {e}")
return False
async def download_assets_async():
"""
Parse JSON files in output/l2d_data directory and download assets to output/downloads
using asynchronous requests for improved performance
"""
input_path = './output/l2d_data/'
output_path = './output/downloads/all_files'
# Create base output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)
download_tasks = []
# Configure connection pool with limits
conn = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=conn) as session:
# Collect all download tasks
for file in os.listdir(input_path):
if not file.startswith("Hero_"):
continue
file_path = os.path.join(input_path, file)
try:
with open(file_path, 'r') as f:
materials = json.load(f)
for material in materials:
path = material.get("path", "")
name = material.get("name", "")
url = material.get("url", "")
if not url:
continue
# Create directory based on path
target_dir = os.path.join(output_path, path)
print(target_dir)
os.makedirs(target_dir, exist_ok=True)
# Set up the download
target_path = os.path.join(target_dir, name)
print(f"Queuing download: {url} to {target_path}")
# Add download task to the list
download_tasks.append(download_file(session, url, target_path))
except Exception as e:
print(f"Error processing {file}: {e}")
# Execute all download tasks concurrently
if download_tasks:
print(f"Starting {len(download_tasks)} downloads...")
await asyncio.gather(*download_tasks)
print("All downloads completed")
else:
print("No files to download")
def download_assets():
"""Wrapper to run the async download function"""
asyncio.run(download_assets_async())
def main():
base_url = "https://highschool.pro.g123-cpp.com/G123Live/assets/"
config_path = './configs/downloads/'
output_path = './output/l2d_data/'
# Process config files and generate JSON output
for file in os.listdir(config_path):
if not 'Hero_' in file:
continue
print(file)
# continue
file_path = os.path.join(config_path, file)
try:
with open(f'{file_path}', 'r') as in_file:
config_json = json.load(in_file)
except Exception as e:
print(f"Error reading file: {e}")
continue
decoded_data = parse_spine_material_config(config_json, base_url, file)
if decoded_data:
output_file = os.path.join(output_path, file)
try:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w') as file:
json.dump(decoded_data, file, indent=4)
print(f"Results written to {output_file}")
except Exception as e:
print(f"Error writing to {output_file}: {e}")
# Download assets from the processed JSON files
download_assets()
if __name__ == "__main__":
main()
1 | import json |
2 | import sys |
3 | import os |
4 | import requests |
5 | import asyncio |
6 | import aiohttp |
7 | from uuid_decoder import decode_uuid |
8 | |
9 | def parse_spine_material_config(config_json, base_url="", file_name="file"): |
10 | """ |
11 | Parse the SpineMaterial config.json file to extract and decode UUID information |
12 | |
13 | Args: |
14 | config_json (str): JSON string containing the SpineMaterial configuration |
15 | base_url (str): Base URL for generating asset URLs |
16 | |
17 | Returns: |
18 | dict: A dictionary containing the parsed and organized configuration data |
19 | """ |
20 | try: |
21 | config = config_json |
22 | |
23 | decoded_data = { |
24 | "materials": [], |
25 | "paths": config.get("paths", {}), |
26 | "types": config.get("types", []), |
27 | "packs": config.get("packs", {}), |
28 | "name": config.get("name", ""), |
29 | "decoded_uuids": [] |
30 | } |
31 | |
32 | uuids = config.get("uuids", []) |
33 | print(uuids) |
34 | for uuid in uuids: |
35 | decoded = decode_uuid(uuid) |
36 | decoded_data["decoded_uuids"].append({ |
37 | "original": uuid, |
38 | "decoded": decoded |
39 | }) |
40 | |
41 | native_map = {} |
42 | versions_native = config.get("versions", {}).get("native", []) |
43 | for idx in range(0, len(versions_native), 2): |
44 | if idx + 1 < len(versions_native): |
45 | native_map[config["uuids"][versions_native[idx]]] = versions_native[idx + 1] |
46 | |
47 | import_map = {} |
48 | versions_import = config.get("versions", {}).get("import", []) |
49 | for idx in range(0, len(versions_import), 2): |
50 | if idx + 1 < len(versions_import): |
51 | if isinstance(versions_import[idx], int): |
52 | if versions_import[idx] < len(uuids): |
53 | import_map[uuids[versions_import[idx]]] = versions_import[idx + 1] |
54 | else: |
55 | import_map[versions_import[idx]] = versions_import[idx + 1] |
56 | |
57 | decoded_data["native_map"] = native_map |
58 | decoded_data["import_map"] = import_map |
59 | |
60 | for path_index, path_info in config.get("paths", {}).items(): |
61 | index = int(path_index) |
62 | material_name = path_info[0] |
63 | # if 'Avatar' not in material_name : |
64 | # continue |
65 | material_type_index = path_info[1] |
66 | material_type = config.get("types", [])[material_type_index] if material_type_index < len(config.get("types", [])) else "unknown" |
67 | print(material_type) |
68 | uuid = uuids[index] if index < len(uuids) else "unknown" |
69 | decoded_uuid = decode_uuid(uuid) |
70 | |
71 | url = "" |
72 | extension = "unknown" |
73 | if base_url and uuid in native_map: |
74 | first_two = decoded_uuid[:2] |
75 | url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}" |
76 | if material_type == "sp.SkeletonData": |
77 | url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}.bin" |
78 | extension = "bin" |
79 | elif material_type == "cc.Asset": |
80 | url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}.atlas" |
81 | extension = "atlas" |
82 | elif material_type == "cc.Texture2D": |
83 | url = f"{base_url}{config.get('name', '')}/native/{first_two}/{decoded_uuid}.{native_map[uuid]}.png" |
84 | extension = "png" |
85 | else: |
86 | continue |
87 | if not url: |
88 | continue |
89 | decoded_data["materials"].append({ |
90 | "path": material_name, |
91 | "name": material_name.split("Avatar/")[-1] + "." + extension, |
92 | "type": material_type, |
93 | "uuid": uuid, |
94 | "hash": native_map[uuid], |
95 | "decoded_uuid": decoded_uuid, |
96 | "url": url, |
97 | "extension": extension |
98 | }) |
99 | |
100 | return decoded_data["materials"] |
101 | |
102 | except json.JSONDecodeError as e: |
103 | print(f"Error parsing JSON: {e}") |
104 | return None |
105 | except Exception as e: |
106 | print(f"Error processing config: {e}") |
107 | return None |
108 | |
109 | async def download_file(session, url, target_path): |
110 | """Download a single file asynchronously""" |
111 | try: |
112 | async with session.get(url) as response: |
113 | if response.status == 200: |
114 | with open(target_path, 'wb') as f: |
115 | f.write(await response.read()) |
116 | print(f"Successfully downloaded {os.path.basename(target_path)}") |
117 | return True |
118 | else: |
119 | print(f"Failed to download {url}, status code: {response.status}") |
120 | return False |
121 | except Exception as e: |
122 | # print(f"Error downloading {url}: {e}") |
123 | return False |
124 | |
125 | async def download_assets_async(): |
126 | """ |
127 | Parse JSON files in output/l2d_data directory and download assets to output/downloads |
128 | using asynchronous requests for improved performance |
129 | """ |
130 | input_path = './output/l2d_data/' |
131 | output_path = './output/downloads/all_files' |
132 | |
133 | # Create base output directory if it doesn't exist |
134 | os.makedirs(output_path, exist_ok=True) |
135 | |
136 | download_tasks = [] |
137 | |
138 | # Configure connection pool with limits |
139 | conn = aiohttp.TCPConnector(limit=10) |
140 | async with aiohttp.ClientSession(connector=conn) as session: |
141 | # Collect all download tasks |
142 | for file in os.listdir(input_path): |
143 | if not file.startswith("Hero_"): |
144 | continue |
145 | |
146 | file_path = os.path.join(input_path, file) |
147 | |
148 | try: |
149 | with open(file_path, 'r') as f: |
150 | materials = json.load(f) |
151 | |
152 | for material in materials: |
153 | path = material.get("path", "") |
154 | name = material.get("name", "") |
155 | url = material.get("url", "") |
156 | |
157 | if not url: |
158 | continue |
159 | |
160 | # Create directory based on path |
161 | target_dir = os.path.join(output_path, path) |
162 | print(target_dir) |
163 | os.makedirs(target_dir, exist_ok=True) |
164 | |
165 | # Set up the download |
166 | target_path = os.path.join(target_dir, name) |
167 | print(f"Queuing download: {url} to {target_path}") |
168 | |
169 | # Add download task to the list |
170 | download_tasks.append(download_file(session, url, target_path)) |
171 | |
172 | except Exception as e: |
173 | print(f"Error processing {file}: {e}") |
174 | |
175 | # Execute all download tasks concurrently |
176 | if download_tasks: |
177 | print(f"Starting {len(download_tasks)} downloads...") |
178 | await asyncio.gather(*download_tasks) |
179 | print("All downloads completed") |
180 | else: |
181 | print("No files to download") |
182 | |
183 | def download_assets(): |
184 | """Wrapper to run the async download function""" |
185 | asyncio.run(download_assets_async()) |
186 | |
187 | def main(): |
188 | base_url = "https://highschool.pro.g123-cpp.com/G123Live/assets/" |
189 | config_path = './configs/downloads/' |
190 | output_path = './output/l2d_data/' |
191 | |
192 | # Process config files and generate JSON output |
193 | for file in os.listdir(config_path): |
194 | if not 'Hero_' in file: |
195 | continue |
196 | print(file) |
197 | # continue |
198 | file_path = os.path.join(config_path, file) |
199 | try: |
200 | with open(f'{file_path}', 'r') as in_file: |
201 | config_json = json.load(in_file) |
202 | except Exception as e: |
203 | print(f"Error reading file: {e}") |
204 | continue |
205 | |
206 | decoded_data = parse_spine_material_config(config_json, base_url, file) |
207 | if decoded_data: |
208 | output_file = os.path.join(output_path, file) |
209 | try: |
210 | os.makedirs(os.path.dirname(output_file), exist_ok=True) |
211 | with open(output_file, 'w') as file: |
212 | json.dump(decoded_data, file, indent=4) |
213 | print(f"Results written to {output_file}") |
214 | except Exception as e: |
215 | print(f"Error writing to {output_file}: {e}") |
216 | |
217 | # Download assets from the processed JSON files |
218 | download_assets() |
219 | |
220 | if __name__ == "__main__": |
221 | main() |