多线程模块threading的简单使用

发布于 2020-06-19  583 次阅读


from aip import AipOcr
import requests, time, re, random, threading

space = re.compile(r'\s+')
htag = re.compile(r'<[^>]+>')

""" 你的 APPID AK SK """
APP_ID = ''
API_KEY = ''
SECRET_KEY = ''

client = AipOcr(APP_ID, API_KEY, SECRET_KEY)
options = {}
options["language_type"] = "CHN_ENG"
options["detect_direction"] = "true"
options["detect_language"] = "true"

def BaiduOCR(image, options=options):
    if isinstance(image, str):
        results = client.basicGeneralUrl(image, options)
    else:
        results = client.basicGeneral(image, options)
    if 'error_code' in results:
        print (f'BaiduOCR error {results["error_code"]} {results["error_msg"]}')
        return ''
    return ''.join(line['words'] for line in results['words_result'] if 'words' in line)
    
def jsonfy(s:str)->object:
    #此函数将不带双引号的json的key标准化
    if s[0] not in ('{','['): print(s + '\n' + 'jsonfy 登录失效,请重新获取Cookie!')
    try:
        obj = eval(s, type('js', (dict,), dict(__getitem__=lambda s, n: n))())
        return obj
    except Exception:
        return {}
    

def getScLc(d):
    return (int(d['sc']), int(d['lc']))

def getXkList(r):
    a = r.text.find('[')
    b = r.text.find(r';/*sc 当前人数, lc 人数上限*/')
    c = r.text.find('{', b)
    print(f'getXkList {a} {b} {c}')
    if b==-1 or c==-1: print(r.text); time.sleep(0.5); return
    j = jsonfy(r.text[a:b])
    j2 = jsonfy(r.text[c:])
    return [(one['id'], one['no'], one['name'], getScLc(j2[str(one['id'])])) for one in j]

def getData(Form = None):
    if Form:
        r = requests.post('', data = Form, headers=headers)
    else:
        r = requests.get(r''
    print(f'getData status_code {r.status_code}')
    if '请不要过快点击' in r.text: time.sleep(random.randint(3,9)/3)
    return r

def getTimestamp(size = 1000):
    t = time.time()
    return int(round(t * size))

def getCaptcha():
    r = requests.get(f'', headers=headers)
    print(f'getCaptcha status_code {r.status_code}')
    img = r.content
    text = BaiduOCR(img)
    print(f'getCaptcha results {text}')
    return text.replace(' ','')

def Xk(CourseId, lock):
    print(f'Xk 怎么会死锁? {lock.locked()}')
    with lock:
        data = {
        "optype": "true",
        "operator0": f"{CourseId}:true:0",
        "captcha_response": getCaptcha()
        }
        r = requests.post(r'', data = data, headers=headers)
    print(f'Xk status_code {r.status_code}')
    return htag.sub(' ', space.sub('', r.text))

def Xk2S(CourseId, lock):
    i = 3
    ord = 1
    while i >= 0:
        st = Xk(CourseId, lock)
        if '选课成功' in st:
            print(f'Xk2S 第{ord}次选课成功!')
            return True
        else:
            print(f'Xk2S 第{ord}次选课:')
            print(st)
            assert '选课失败:公选人数已满' not in st, 'Xk2S 公选人数已满 无法继续'
            assert '选课失败:你已经选过' not in st, 'Xk2S 选课成功 无需继续'
            if '操作失败:验证码错误' not in st: i -= 1
        ord += 1
    else:
        return False
#PHPM110047.01

from headers import headers

def getCourse(Form):
    Courses = getXkList(getData(Form))
    while not Courses:
        print(f'getCourse 想必是要死循环了 {Courses}')
        time.sleep(random.randint(5,16)/2)
        Courses = getXkList(getData(Form))
        
    print (Courses)
    for Course in Courses:
        if Course[1] in Form['lessonNo']:
            print (Course)
            return Course
    assert False, 'getCourse 出现未知错误,请重试!'

def isSuccessful(Form):
    time.sleep(5)
    Courses = getXkList(getData())
    print (Courses)
    for Course in Courses:
        if Course[1] == Form['lessonNo']:
            print (Course)
            return True
    return False

#Form['lessonNo'] = 'PEDU110091.19'

def start(lessonNo, lock, event):
    Form = {
    "lessonNo": lessonNo,
    "courseCode": "",
    "courseName": ""
    }
    time.sleep(random.randint(0,50)/10)
    print(f'start {lessonNo} {id(lock)} {id(event)}')
    while True:
        try :
            Course = getCourse(Form)
            if Course[3][0] < Course[3][1]:
                if Xk2S(Course[0], lock) and isSuccessful(Form):
                    print(f'主循环 选课成功 {time.ctime(time.time())}')
                    break
                else:
                    print(f'主循环 选课失败 继续尝试 {time.ctime(time.time())}')
            else:
                print(f'主循环 人数已达上限 继续等待 {time.ctime(time.time())}')
            event.set()
            time.sleep(random.randint(20,50)/5)
        except requests.ConnectionError:
            print('网络超时, 延时等待1分钟')
            for i in range(6):
                event.set()
                time.sleep(10)

printf = print
def wrapprint(*args, **kw):
    printf(threading.current_thread().name, ":",end='')
    printf(*args, **kw)
print = wrapprint





import threading, time, ctypes, inspect

def _async_raise(tid, exctype):
    """raises the exception, performs cleanup if needed"""
    tid = ctypes.c_long(tid)
    if not inspect.isclass(exctype):
        exctype = type(exctype)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
    if res == 0:
        #raise ValueError("invalid thread id")
        return "ValueError: invalid thread id"
    elif res != 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
        #raise SystemError("PyThreadState_SetAsyncExc failed")
        return "SystemError: PyThreadState_SetAsyncExc failed"
    return "success!"
def startThread(tasks, timeout=35):
    lock = threading.Lock()
    tasks = [(target, args, threading.Event()) for target,args in tasks]
    #for task in tasks: task['args'] = task.get('args',()) + (lock, watchdog)
    threads = [[threading.Thread(target=target, args=args+(lock, event)), event] for target,args,event in tasks]
    #if threads: timeout = timeout // len(threads) + 1
    for t,event in threads: t.setDaemon(True)
    for t,event in threads: t.start()
    print('startThread 守护线程开始执行!')
    counter = 1
    while any(t.is_alive() for t,event in threads):
        print(f'startThread 守护线程第{counter}轮 开始 {time.ctime(time.time())}')
        time.sleep(timeout)
        for t,event in threads:
            if not event.isSet() and t.is_alive(): break
            time.sleep(0.1)
        else: print(f'startThread 守护线程第{counter}轮 正常 {time.ctime(time.time())}'); counter += 1;continue
        print(f'startThread 守护线程第{counter}轮 发生异常,准备重启... {time.ctime(time.time())}')
        lock = threading.Lock()
        restart = []
        for i,one in enumerate(threads):
            t, event = one
            if not t.is_alive(): print(f'startThread 线程自然退出,忽略 {t.name}'); continue
            if event.isSet(): event.clear()
            else:
                print(f'startThread 正在结束线程 {t.name} {_async_raise(t.ident, SystemExit)}')
                target,args,event = tasks[i]
                t = threads[i][0] = threading.Thread(target=target, args=args+(lock, event))
                t.setDaemon(True)
                restart.append(t) 
        for t in restart: t.start(); print(f'startThread 启动替代线程 {t.name}')
        del restart
    print('startThread 守护线程准备退出中...')
    for t,event in threads:
        if t.is_alive(): print(f'startThread 正在结束线程 {t.name} {_async_raise(t.ident, SystemExit)}')
    print('startThread 守护线程已退出!')

from FDUxk1 import start

cl = [] 
#cl.append('PEDU110091.19') #乒乓球
cl.append('PHPM110047.01') #医院经营管理
#cl.append('TOUR110001.01') #旅游与经济管理
cl.append('PTSS110016.01') #当代世界经济与政治
cl.append('PHIL119056.01') #哲学视野中的人工智能

args = [(start, (c,)) for c in cl]
t = threading.Thread(target=startThread, args=(args,))
t.start()
while t.is_alive():
    t.join(120)
    print(f'每2分钟一次,预防死机 {time.ctime(time.time())}')
inp = input('等待退出...')

医学生