主要步骤:
- 获取ip列表
- 检测ip是否有效
import asyncio
import time
import aiohttp
from proxy_redis import ProxyRedis # 自定义ip管理类
async def test_ip(ip, p_r, sem):
try:
# 信号量并发控制
async with sem:
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/ip', proxy='http://'+ip, timeout=10) as resp:
con = await resp.text() # 获取数据
if con:
# 设置最大权重
p_r.zset_zadd(ip)
else:
# 降低权重
p_r.zset_zincrby(ip)
print(ip, '降低权重')
except Exception as e:
print(ip, e)
# 降低权重
p_r.zset_zincrby(ip)
print(ip, '降低权重')
async def main():
# 实例化处理ip的类
p_r = ProxyRedis()
ip_list = p_r.zset_zrange() # 获取所有ip
sem = asyncio.Semaphore(100) # 并发控制
if ip_list:
# print(ip_list)
tasks = [] # 存放所有的任务
for ip in ip_list:
# 添加任务
tasks.append(asyncio.create_task(test_ip(ip, p_r, sem)))
await asyncio.wait(tasks)
def run():
while True:
try:
asyncio.run(main())
time.sleep(15)
except Exception as e:
print('ip测试异步出现错误', e)
if __name__ == '__main__':
# asyncio.run(main())
run()
- 对ip进行标识和存储
import random
from settings import * # 导入所有的配置
import redis
class ProxyRedis:
def __init__(self):
# 连接reids数据库
self.r = redis.Redis(host=HOST, port=PORT, password=PASSWORD, db=DB, decode_responses=True)
# 添加ip
def zset_zadd(self, ip):
self.r.zadd(ZSET_NAME, {
ip: SCORE})
# 降低权重
def zset_zincrby(self, ip):
# 先查出权重
score = self.r.zscore(ZSET_NAME, ip)
# 判断当前的权重是否小于最低权重
if score > MIN_SCORE:
# 将当前权重减少1
self.r.zincrby(ZSET_NAME, -1, ip)
else:
# 删除ip
print('ip:', ip,'低于最低权重,删除')
self.r.zrem(ZSET_NAME, ip)
def get_ip(self):
''' 返回高权重的ip '''
ip = self.r.zrangebyscore(ZSET_NAME, SCORE, SCORE, 0, -1)
if ip:
# 随机返回一个ip
return random.choice(ip)
else:
ip = self.r.zrangebyscore(ZSET_NAME, 80, SCORE, 0, -1)
if ip:
# 随机返回一个ip
return random.choice(ip)
else:
print('实在没有了~')
# 返回所有的ip 以供test_ip 进行测试
def zset_zrange(self):
return self.r.zrange(ZSET_NAME, 0, -1)
- 请求这个文件的接口
from flask import Flask # pip install flask
from proxy_redis import ProxyRedis
app = Flask(__name__)
@app.route('/')
def index():
pr = ProxyRedis()
ip = pr.get_ip() # 返回可用性比较大的ip
if ip:
return ip
return '来了老弟'
# 只要调用run 就可以运行当前的flask框架
def run():
app.run() # 运行flask
if __name__ == '__main__':
app.run(debug=True) # 运行flask
主函数
from multiprocessing import Process
from get_ip import get_ip
from app import run
from test_ip import run as t_run
def main():
Process(target=get_ip).start() # 开启抓ip的进程
Process(target=run).start() # 开启web后去ip的借口的进程
Process(target=t_run).start() # 开启测试ip的进程
if __name__ == '__main__':
main()
文章评论