controllers.py
from workers import worker_func
from threads import log_notifier, stat_updater
from utils import slice_list, slice_range, parse_proxy_string
from multiprocessing import Process, Queue
from threading import Thread
class Controller():
def __init__(self, arguments):
self.arguments = arguments
self.workers = []
self.proxies = []
self.count_queue = Queue()
self.log_queue = Queue()
if self.arguments.proxy_file:
self.load_proxies()
self.start_stat_updater()
self.start_log_notifier()
self.start_workers()
def load_proxies(self):
proxies = set()
with self.arguments.proxy_file as fp:
line_num = 0
while (line := fp.readline()):
try:
proxy = parse_proxy_string(line.rstrip())
if not proxy in proxies:
proxies.add(proxy)
except Exception as err:
print(f"Error while parsing line {line_num+1} in proxy file: {err!r}")
finally:
line_num += 1
assert proxies, "Proxy file is empty."
self.proxies.extend(proxies)
def start_log_notifier(self):
thread = Thread(
target=log_notifier,
name="LogNotifier",
daemon=True,
args=(self.log_queue, self.arguments.webhook_url))
thread.start()
def start_stat_updater(self):
thread = Thread(
target=stat_updater,
name="StatUpdater",
daemon=True,
args=(self.count_queue,))
thread.start()
def start_workers(self):
for worker_num in range(self.arguments.workers):
worker = Process(
target=worker_func,
name=f"Worker-{worker_num}",
daemon=True,
kwargs=dict(
thread_count=self.arguments.threads,
log_queue=self.log_queue,
count_queue=self.count_queue,
proxy_list=slice_list(self.proxies, worker_num, self.arguments.workers),
timeout=self.arguments.timeout,
gid_ranges=[
slice_range(gid_range, worker_num, self.arguments.workers)
for gid_range in self.arguments.range
],
gid_cutoff=self.arguments.cut_off,
gid_chunk_size=self.arguments.chunk_size
)
)
self.workers.append(worker)
for worker in self.workers:
worker.start()
def join_workers(self):
for worker in self.workers:
worker.join()
finder.py
from controllers import Controller
from arguments import parse_args
import multiprocessing
if __name__ == "__main__":
multiprocessing.freeze_support()
controller = Controller(arguments = parse_args())
try:
controller.join_workers()
except KeyboardInterrupt:
pass
Hocam ne yapmaya çalışıyorsunuz? Proje olarak*
controllers.py
Python:from workers import worker_func. from threads import log_notifier, stat_updater. from utils import slice_list, slice_range, parse_proxy_string. from multiprocessing import Process, Queue. from threading import Thread. class Controller(): def __init__(self, arguments): self.arguments = arguments. self.workers = [] self.proxies = [] self.count_queue = Queue() self.log_queue = Queue() if self.arguments.proxy_file: self.load_proxies() self.start_stat_updater() self.start_log_notifier() self.start_workers() def load_proxies(self): proxies = set() with self.arguments.proxy_file as fp: line_num = 0 while (line := fp.readline()): try: proxy = parse_proxy_string(line.rstrip()) if not proxy in proxies: proxies.add(proxy) except Exception as err: print(f"Error while parsing line {line_num+1} in proxy file: {err!r}") finally: line_num += 1 assert proxies, "Proxy file is empty." self.proxies.extend(proxies) def start_log_notifier(self): thread = Thread( target=log_notifier, name="LogNotifier", daemon=True, args=(self.log_queue, self.arguments.webhook_url)) thread.start() def start_stat_updater(self): thread = Thread( target=stat_updater, name="StatUpdater", daemon=True, args=(self.count_queue,)) thread.start() def start_workers(self): for worker_num in range(self.arguments.workers): worker = Process( target=worker_func, name=f"Worker-{worker_num}", daemon=True, kwargs=dict( thread_count=self.arguments.threads, log_queue=self.log_queue, count_queue=self.count_queue, proxy_list=slice_list(self.proxies, worker_num, self.arguments.workers), timeout=self.arguments.timeout, gid_ranges=[ slice_range(gid_range, worker_num, self.arguments.workers) for gid_range in self.arguments.range ], gid_cutoff=self.arguments.cut_off, gid_chunk_size=self.arguments.chunk_size ) ) self.workers.append(worker) for worker in self.workers: worker.start() def join_workers(self): for worker in self.workers: worker.join()
Düzenlenenler:
Eki Görüntüle 1633526
finder.py
Python:from controllers import Controller. from arguments import parse_args. import multiprocessing. if __name__ == "__main__": multiprocessing.freeze_support() controller = Controller(arguments = parse_args()) try: controller.join_workers() except KeyboardInterrupt: pass
Düzenlenenler:
Eki Görüntüle 1633533
Dosyaları bu şekilde düzenleyip deneyin. Ayrıca diğer import ettiğiniz dosyalardaki hataları da varsa düzeltin.
Ben bunu başlatırken nasıl konumunu alabilirim?controllers.py
Python:from workers import worker_func from threads import log_notifier, stat_updater from utils import slice_list, slice_range, parse_proxy_string from multiprocessing import Process, Queue from threading import Thread class Controller(): def __init__(self, arguments): self.arguments = arguments self.workers = [] self.proxies = [] self.count_queue = Queue() self.log_queue = Queue() if self.arguments.proxy_file: self.load_proxies() self.start_stat_updater() self.start_log_notifier() self.start_workers() def load_proxies(self): proxies = set() with self.arguments.proxy_file as fp: line_num = 0 while (line := fp.readline()): try: proxy = parse_proxy_string(line.rstrip()) if not proxy in proxies: proxies.add(proxy) except Exception as err: print(f"Error while parsing line {line_num+1} in proxy file: {err!r}") finally: line_num += 1 assert proxies, "Proxy file is empty." self.proxies.extend(proxies) def start_log_notifier(self): thread = Thread( target=log_notifier, name="LogNotifier", daemon=True, args=(self.log_queue, self.arguments.webhook_url)) thread.start() def start_stat_updater(self): thread = Thread( target=stat_updater, name="StatUpdater", daemon=True, args=(self.count_queue,)) thread.start() def start_workers(self): for worker_num in range(self.arguments.workers): worker = Process( target=worker_func, name=f"Worker-{worker_num}", daemon=True, kwargs=dict( thread_count=self.arguments.threads, log_queue=self.log_queue, count_queue=self.count_queue, proxy_list=slice_list(self.proxies, worker_num, self.arguments.workers), timeout=self.arguments.timeout, gid_ranges=[ slice_range(gid_range, worker_num, self.arguments.workers) for gid_range in self.arguments.range ], gid_cutoff=self.arguments.cut_off, gid_chunk_size=self.arguments.chunk_size ) ) self.workers.append(worker) for worker in self.workers: worker.start() def join_workers(self): for worker in self.workers: worker.join()
Düzenlenenler:
Eki Görüntüle 1633526
finder.py
Python:from controllers import Controller from arguments import parse_args import multiprocessing if __name__ == "__main__": multiprocessing.freeze_support() controller = Controller(arguments = parse_args()) try: controller.join_workers() except KeyboardInterrupt: pass
Düzenlenenler:
Eki Görüntüle 1633533
Dosyaları bu şekilde düzenleyip deneyin. Ayrıca diğer import ettiğiniz dosyalardaki hataları da varsa düzeltin.
os.getcwd() ile projenin bulunduğu konumu alabilirsiniz.Roblox oyununda sahipsiz grupları bulmaya yarayan bir kod. Kodu zaten başkasından aldım. Kendim düzenledim. Normalde finder.py ile controllers.py farklı yerlerde. Onları aynı klasöre aldım. Bu hata ortaya çıktı.
Hala aynı.
Ben bunu başlatırken nasıl konumunu alabilirim?
Eki Görüntüle 1633635
os.getcwd() ile projenin bulunduğu konumu alabilirsiniz.
Ayrıca projenizin isterseniz karmaşıklığundan kurtulabilmek için şu projeyi inceleyin:
Roblox-Group-Checker/group-checker.py at main · NovalineDev/Roblox-Group-Checker
YazBel formunda da konu açın.Benim kullandığım ile bu çok farklı. Bunu kullanmam daha da kafa karışıklığına yol açar.
Bu sitenin çalışmasını sağlamak için gerekli çerezleri ve deneyiminizi iyileştirmek için isteğe bağlı çerezleri kullanıyoruz.