import concurrent.futures
import time
def task(i):
# Your task logic here, for example:
time.sleep(i)
print(f"Task {i} completed")
def main():
timeout = 2 # Set your desired timeout in seconds
tasks = range(5) # This represents your lopo
with concurrent.futures.ThreadPoolExecutor() as executor:
for i in tasks:
try:
executor.submit(task, i).result(timeout)
except concurrent.futures.TimeoutError:
print(f"Task {i} timed out")
if __name__ == "__main__":
main()
if using requests
, use the timeout
parameter:
r = requests.get(shortened_url, timeout=15, headers={"User-Agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"})