求助 Java 大量任务分布式处理的问题

讨论 未结 24 32
yesterdaysun
yesterdaysun 会员 2022年4月26日 04:47 发表
<p>问题是这样, 现在系统中有大量去和第三方 API 交互的任务, 比如有 1000 个用户, 每个用户又有各自 1 万个小的记录去和第三方 API 慢慢交互, 或者没有那么多记录但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络, 之前的方式就是一个线程池, 把所有大小任务塞进去, 但是这个线程池大小很难搞, 多了的话, 有时会突然来一堆任务占住 CPU 和数据库, 少了的话, 一大堆任务又阻塞住.</p> <p>现在想搞成分布式好几台机器一起跑, 考察了一下方案, 有点迷惑:</p> <ol> <li>一种是分布式任务队列, 看到一个 Celery 好像是这种, 但是这个 python 的, 我想要 Java 的, 结果没找到</li> <li>一种是任务调度框架, quartz, xxljob 这种, 感觉我想要的更靠近这种, 但是又有点迷惑, 比如感觉我这种需求适合"分片广播"这种任务, 比如我把 1000 个用户的任务分片到 3 台机器, 但是然后每台机器上的任务为每个用户再单独为他名下的 1 万条记录自己做线程池请求? 或者我把任务拆到单个小记录的级别, 那岂不是得成千上万的 trigger, 然后任务调度又一般是一个主 job, 然后传参数这种, 那比如我要确保一个时间只有一个用户的任务在跑, 怎么做这个限制, 全要自己在任务中处理吗</li> </ol> <p>所以, 其实就是我想找一个比较现成的框架, 能处理超长的任务队列, 分布式, 并发的执行, 可以自动削峰填谷, 有一些任务自动处理, 比如重试, 故障转移等等, 又能够有一些保证一致性的机制, 比如按 job+某个参数确保不会重复执行, 还能程序方式发起调度, 而不是在某个管理后台手动编辑</p> <p>我想知道这样的东西存在吗, 还是必须自己实现, 求各位大佬赐教</p>
收藏(0)  分享
相关标签: 灌水交流
注意:本文归作者所有,未经作者允许,不得转载
24个回复
  • biubiuF
    2022年4月26日 04:47
    你需要 kafka ,把你现在的 jobs 弄成消费者
    0 0
  • RedBeanIce
    2022年4月26日 05:16
    可能是 xxjob ????我记得有分片处理,,,就是一堆小任务,大家去处理
    0 0
  • bthulu
    2022年4月26日 05:16
    既然时间都消耗在网络 IO 上, 上 windows 系统, 用 IOCP 去调接口, 单机就能搞定了, 用不着搞这么多骚操作
    0 0
  • jorneyr
    2022年4月26日 05:16
    Kafka 的每个 partition 一个消费者组里同时只能有一个消费者进行消费,这种情况我觉得 RabbitMQ 可能更合适,不必明确的限制消费者个数,看情况随时动态增减消费者,每个消息可以使用阻塞的方式执行。
    0 0
  • lmshl
    2022年4月26日 06:49
    改异步纤程,你这才一千万个 IO 小任务,犯不着上分布式。Akka Stream (调度) + Akka HTTP (调 API ) 随便搞一搞单机就完事了
    0 0
  • ming159
    2022年4月26日 06:49
    如你所说:“其实时间都是消耗在网络 IO 上” 线程是不解决 IO 问题的,你需要的是 异步 IO 处理机制。一个线程同时处理多个 IO ,而不是一个线程处理一个 IO 。
    0 0
  • ymmud
    2022年4月26日 06:49
    akka cluster sharding , 根据需求分片就行了
    0 0
  • lmshl
    2022年4月26日 06:49
    我写过一个 所有 fiber 去数据库查任务状态,select * from tasks where state = 'todo',然后执行这一批任务,更新任务状态。 最后并行 128 同时跑所有 fiber
    0 0
  • git00ll
    2022年4月26日 07:49
    `但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络` 这句话不明白,啥接口要耗时 10 分钟? 等网络是什么意思。如果接口一次请求响应要 10 分钟,多开点线程如 200-300 个,网络堵塞的时候是不会大量占用 cpu 的。关键如果接口能否承受这么高并发数。
    0 0
  • 5boy
    2022年4月26日 08:16
    mark, 有没有不用大数据框架实现的方式?
    0 0
  • litchinn
    2022年4月26日 08:49
    ,隔壁刚提出的这个动态线程池不知道能不能实现这个需求。另外你说线程池大小不好调,换成分布式多个机器跑,那节点数量不是一样需要调整吗,k8s 弹性伸缩?
    0 0
  • Saurichthys
    2022年4月26日 09:18
    不要用 xxl-job 的方案,基于数据库,性能不佳,莫名其妙问题很多
    0 0
  • yesterdaysun
    2022年4月26日 10:18
    说的不清楚, 其实是一个长流程, 比如请求一个报告, 但是不会立即返回, 需要等第三方处理好, 才能拿到, 中间就每隔 1-2 分钟去轮询一次看看报告有没有好, 通常都要 10 分钟左右, 关键不是每种任务都是这样的, 如果单为它建一个线程池又感觉有点过了, 想搞个通用的解法 上面的我都研究了一下, 我这个系统比较简单, 本身就是个单体, 并不是分布式的, 这次也只是想要把这个后台任务独立出去搞多机并行, 感觉我这个还不到动用 akka/协程之类的方案的地步, 应该还是简单点, 一个简单的调度系统加动态线程池就足够了, 美团开源的那个动态线程池看上去比较适合, 我先研究一下试试看
    0 0
  • jekkro
    2022年4月26日 10:48
    用 redis 实现异步队列即可,一个进程专门负责插入任务到 Redis 队列中,另外几个负责从队列中获取信息并执行,完成后更新数据库里的状态。如果发生 Redis 所在的机器 down 机,则负责插入任务的那个进程重新把没有完成的再插入一遍(不过这个目前为止还没有发生过)。我有类似的业务,已经跑了 12 年了。 另外因为 Redis 有各种复杂数据结构,可以满足延时队列,优先级队列,自动去重等功能。感觉性能优秀,代码简单。
    0 0
  • jekkro
    2022年4月26日 10:48
    不能用非阻塞 io 的原因一般是因为那些接口库不是自己实现的,没办法去改造那些接口底层库,虽然 http 的接口自己也可以实现,但是有些场景(比如各种开放平台的接口库)不可能把第三方提供的接口库重新写一边,而仅仅是为了解决阻塞 io 的问题。
    0 0
  • lmshl
    2022年4月26日 10:48
    以上技术方案中,综合代码量和开发难度来看,从易到难依次应该是 纤程 >> Akka Stream > nio-pool > xxjob/scheduler > 动态线程池屎上雕花 >> akka cluster sharding >> akka cluster without sharding 纤程是真的简单,你这需求 20-50 行左右就完事了,不就是个 flow = post(...) >> (sleep(1.minutes) *> check(xxx)).retryWhile(isCompleted) >> retrieve() 然后 tasks.foreachPar(<你想开多大并行>)(flow) 的事
    0 0
  • outoftimeerror
    2022年4月26日 10:48
    我也写 java ,不过你这个需求让我选型的话,我会用 golang (goroutine+chan)+ redis
    0 0
  • XhstormR02
    2022年4月26日 11:49
    java 的纤程 Quasar ,最近一次更新是 2018 年,都好多年没更新了 ,倒不如用 kotlin 的 coroutines
    0 0
  • dddd1919
    2022年4月26日 13:45
    显然是该上 MQ 了,把用户放到队列,由消费端去挨个处理用户任务,如果单个用户跑的话配一个消费任务就够了 强业务需求建议 RabbitMQ/RocketMQ
    0 0
  • lmshl
    2022年4月26日 14:18
    反正我说的也不是 Java 🐶 其实上面写的是 Scala 伪代码 🐶 档燃,Kotlin 也不错,起码有 suspend/await 可以用,不像 IO Monad 要切换编程思维
    0 0