Почему не выполняются методы из очереди bull nest js?
Может быть дело в не правильном подключении?
Вот логи очереди:
OK
1710330289.313322 [0 127.0.0.1:60844] "eval" "--[[n Adds a job to the queue by doing the following:n - Increases the job counter if needed.n - Creates a new job key with the job data.nn - if delayed:n - computes timestamp.n - adds to delayed zset.n - Emits a global event 'delayed' if the job is delayed.n - if not delayedn - Adds the jobId to the wait/paused list in one of three ways:n - LIFOn - FIFOn - prioritized.n - Adds the job to the "added" list so that workers gets notified.nn Input:n KEYS[1] 'wait',n KEYS[2] 'paused'n KEYS[3] 'meta-paused'n KEYS[4] 'id'n KEYS[5] 'delayed'n KEYS[6] 'priority'nn ARGV[1] key prefix,n ARGV[2] custom id (will not generate one automatically)n ARGV[3] namen ARGV[4] data (json stringified job data)n ARGV[5] opts (json stringified job opts)n ARGV[6] timestampn ARGV[7] delayn ARGV[8] delayedTimestampn ARGV[9] priorityn ARGV[10] LIFOn ARGV[11] tokenn]]nlocal jobIdnlocal jobIdKeynlocal rcall = redis.callnnlocal jobCounter = rcall("INCR", KEYS[4])nnif ARGV[2] == "" thenn jobId = jobCountern jobIdKey = ARGV[1] .. jobIdnelsen jobId = ARGV[2]n jobIdKey = ARGV[1] .. jobIdn if rcall("EXISTS", jobIdKey) == 1 thenn return jobId .. "" -- convert to stringn endnendnn-- Store the job.nrcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])nn-- Check if job is delayednlocal delayedTimestamp = tonumber(ARGV[8])nif(delayedTimestamp ~= 0) thenn local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)n rcall("ZADD", KEYS[5], timestamp, jobId)n rcall("PUBLISH", KEYS[5], delayedTimestamp)nelsen local targetnn -- Whe check for the meta-paused key to decide if we are paused or notn -- (since an empty list and !EXISTS are not really the same)n local pausedn if rcall("EXISTS", KEYS[3]) ~= 1 thenn target = KEYS[1]n paused = falsen elsen target = KEYS[2]n paused = truen endnn -- Standard or priority addn local priority = tonumber(ARGV[9])n if priority == 0 thenn -- LIFO or FIFOn rcall(ARGV[10], target, jobId)n elsen -- Priority addn rcall("ZADD", KEYS[6], priority, jobId)n local count = rcall("ZCOUNT", KEYS[6], 0, priority)nn local len = rcall("LLEN", target)n local id = rcall("LINDEX", target, len - (count-1))n if id thenn rcall("LINSERT", target, "BEFORE", id, jobId)n elsen rcall("RPUSH", target, jobId)n endnn endnn -- Emit waiting event (wait..ing@token)n rcall("PUBLISH", KEYS[1] .. "ing@" .. ARGV[11], jobId)nendnnreturn jobId .. "" -- convert to stringn" "6" "bull:parsers:wait" "bull:parsers:paused" "bull:parsers:meta-paused" "bull:parsers:id" "bull:parsers:delayed" "bull:parsers:priority" "bull:parsers:" "" "__default__" "{"project_id":46,"group_id":null}" "{"attempts":1,"delay":0,"timestamp":1710330289308}" "1710330289308" "0" "0" "0" "LPUSH" "46b0491a-5484-43a2-9c7d-9b1be6c59d19"
1710330289.315169 [0 lua] "INCR" "bull:parsers:id"
1710330289.316294 [0 lua] "HMSET" "bull:parsers:41" "name" "__default__" "data" "{"project_id":46,"group_id":null}" "opts" "{"attempts":1,"delay":0,"timestamp":1710330289308}" "timestamp" "1710330289308" "delay" "0" "priority" "0"
1710330289.316377 [0 lua] "EXISTS" "bull:parsers:meta-paused"
1710330289.316402 [0 lua] "LPUSH" "bull:parsers:wait" "41"
1710330289.316437 [0 lua] "PUBLISH" "bull:parsers:waiting@46b0491a-5484-43a2-9c7d-9b1be6c59d19" "41"
Вот сам код:
есть процессор:
import { OnQueueActive, OnQueueCompleted, Process, Processor, } from '@nestjs/bull'; import { Job } from 'bull'; import { Worker } from 'worker_threads'; import { PrismaService } from '@prisma/prisma.service'; import { LoggerService } from '../logger.service'; Processor('parsers'); export class ParserQueueProcess { constructor( private readonly prisma: PrismaService, private readonly loggerService: LoggerService, ) {} @Process() async runPageParser(job: Job) { console.log(job); } @OnQueueActive() onActive(job: Job) { console.log( `Processing job ${job.id} of type ${job.name} with data ${job.data}`, ); } @OnQueueCompleted() onCompleted(job: Job) { console.log(`Job with ${job.id} completed...`); } } |
import { OnQueueActive, OnQueueCompleted, Process, Processor, } from '@nestjs/bull'; import { Job } from 'bull'; import { Worker } from 'worker_threads'; import { PrismaService } from '@prisma/prisma.service'; import { LoggerService } from '../logger.service'; Processor('parsers'); export class ParserQueueProcess { constructor( private readonly prisma: PrismaService, private readonly loggerService: LoggerService, ) {} @Process() async runPageParser(job: Job) { console.log(job); } @OnQueueActive() onActive(job: Job) { console.log( `Processing job ${job.id} of type ${job.name} with data ${job.data}`, ); } @OnQueueCompleted() onCompleted(job: Job) { console.log(`Job with ${job.id} completed...`); } }
есть модуль в котором подключен процессор:
import { Module } from '@nestjs/common'; import { PageParserService } from './page-parser.service'; import { PageParserController } from './page-parser.controller'; // import { ParsersQueueModule } from '../parsers-queue/parsers-queue.module'; import { BullModule } from '@nestjs/bull'; import { ParserQueueProcess } from './parser-queue.process'; @Module({ imports: [ BullModule.registerQueue({ name: 'parsers', }), ], controllers: [PageParserController], providers: [PageParserService, ParserQueueProcess], exports: [PageParserService], }) export class PageParserModule {} |
import { Module } from '@nestjs/common'; import { PageParserService } from './page-parser.service'; import { PageParserController } from './page-parser.controller'; // import { ParsersQueueModule } from '../parsers-queue/parsers-queue.module'; import { BullModule } from '@nestjs/bull'; import { ParserQueueProcess } from './parser-queue.process'; @Module({ imports: [ BullModule.registerQueue({ name: 'parsers', }), ], controllers: [PageParserController], providers: [PageParserService, ParserQueueProcess], exports: [PageParserService], }) export class PageParserModule {}
И есть сервис:
import { Injectable } from '@nestjs/common'; import { PrismaService } from '@prisma/prisma.service'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { PageParserRun } from './interfaces'; @Injectable() export class PageParserService { constructor( private readonly prisma: PrismaService, @InjectQueue('parsers') private readonly queue: Queue, ) {} async runParser(project_id: number, group_id: number | null) { const pageParserRun: PageParserRun = { project_id: project_id, group_id: group_id, }; console.log('Добавлено в очередь'); try { await this.queue.add(pageParserRun); console.log(this.queue.client.status); return this.queue.client.status; // await this.getData(project_id, null); } catch (e) { console.error(e); } } async getData(project_id: number, group_id: number | null) { return await this.prisma.pageParserData.findMany({ where: { project_id: project_id, }, }); } } |
import { Injectable } from '@nestjs/common'; import { PrismaService } from '@prisma/prisma.service'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { PageParserRun } from './interfaces'; @Injectable() export class PageParserService { constructor( private readonly prisma: PrismaService, @InjectQueue('parsers') private readonly queue: Queue, ) {} async runParser(project_id: number, group_id: number | null) { const pageParserRun: PageParserRun = { project_id: project_id, group_id: group_id, }; console.log('Добавлено в очередь'); try { await this.queue.add(pageParserRun); console.log(this.queue.client.status); return this.queue.client.status; // await this.getData(project_id, null); } catch (e) { console.error(e); } } async getData(project_id: number, group_id: number | null) { return await this.prisma.pageParserData.findMany({ where: { project_id: project_id, }, }); } }
Дополнительно:
Оберни в тег кода!
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { PrismaModule } from '@prisma/prisma.module';
import { APP_GUARD } from '@nestjs/core';
import { JwtAuthGuard } from '@auth/guards/jwt-auth.guard';
import { ConfigModule } from '@nestjs/config';
import { BacklinksModule } from './backlinks/backlinks.module';
import { FastCheckModule } from './fast-check/fast-check.module';
import { MainLinksModule } from './main-links/main-links.module';
import { GoogleSearchContentModule } from './google-search-content/google-search-content.module';
import { UsersModule } from '@users/users.module';
import { AuthModule } from '@auth/auth.module';
import { ProjectsModule } from './projects/projects.module';
import { PageParserModule } from './page-parser/page-parser.module';
import { BullModule } from '@nestjs/bull';
// import { PageParserService } from './page-parser.service';
// import { ParsersModule } from './parsers/parsers.module';
// import { ParsersModule } from './parsers/parsers.module';
// import { ParsersQueueModule } from './parsers-queue/parsers-queue.module';
// import { BullWorkerService } from './bull-worker/bull-worker.service';
@Module({
imports: [
AuthModule,
PrismaModule,
BacklinksModule,
FastCheckModule,
MainLinksModule,
GoogleSearchContentModule,
UsersModule,
ConfigModule.forRoot({ isGlobal: true }),
ProjectsModule,
PageParserModule,
// BullModule,
BullModule.forRoot({
redis: {
host: '127.0.0.1',
port: 6379,
},
}),
// BullModule.registerQueue({
// name: 'parsers',
// }),
// ParsersQueueModule,
],
controllers: [AppController],
providers: [
AppService,
{
provide: APP_GUARD,
useClass: JwtAuthGuard,
},
// BullWorkerService,
],
})
export class AppModule {}
Синтаксическая ошибка
Опишите проблему, и специалист поможет с настройкой, исправлением ошибки или доработкой сайта. Подберём понятный план работ без лишней переписки.
Пока нет других ответов. Будьте первым, кто поможет автору.
Ответить на вопрос
Для того чтобы понять, почему методы из очереди Bull не выполняются в Nest.js, необходимо провести анализ нескольких возможных причин.
1. Неправильная настройка очереди Bull: убедитесь, что вы правильно настроили и запустили очередь Bull в вашем приложении Nest.js. Убедитесь, что вы создали экземпляр очереди Bull и добавили необходимые обработчики для выполнения задач.
import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; @Injectable() export class BullQueueService { private readonly queue: Queue; constructor() { this.queue = new Queue('myQueueName'); this.queue.process(async (job) => { // выполнение задачи }); } async addJob(data: any) { await this.queue.add(data); } }
2. Проблемы с подключением к Redis: Bull требует Redis для хранения задач в очереди. Убедитесь, что вы правильно сконфигурировали подключение к Redis в вашем приложении Nest.js. Проверьте правильность настроек подключения к Redis, а также доступность Redis сервера.
3. Ошибки в обработчике задачи: если методы из очереди Bull не выполняются, возможно, в вашем обработчике задачи есть ошибки, которые препятствуют выполнению задач. Убедитесь, что ваш обработчик задачи корректно обрабатывает входные данные и выполняет необходимые действия.
4. Логирование ошибок: для выявления возможных проблем с выполнением задач из очереди Bull, рекомендуется добавить логирование ошибок. Проверьте логи вашего приложения Nest.js, чтобы выявить возможные ошибки или исключения, которые могут возникать при выполнении задач.
После тщательного анализа перечисленных выше причин вы должны быть в состоянии определить, почему методы из очереди Bull не выполняются в вашем приложении Nest.js и принять необходимые меры для их исправления.