Skip to content

Instantly share code, notes, and snippets.

@Ghost-chu
Last active February 5, 2025 12:49
Show Gist options
  • Select an option

  • Save Ghost-chu/1abdb315436f3ace0e29f0629049b04d to your computer and use it in GitHub Desktop.

Select an option

Save Ghost-chu/1abdb315436f3ace0e29f0629049b04d to your computer and use it in GitHub Desktop.
archives.mcbbs.co spider
我正在使用 Java 23 编写爬虫,爬虫目标地址是:
https://archives.mcbbs.co/read.php?tid=<TID>
TID 是从 1 开始的整数,上不封顶
当链接有效时,返回 200 状态码,并包含网页内容。此时你需要将整个页面存储到工作目录下的 archive 目录中的 <TID>.html 文件。
特别注意的是,即使链接无效,也会返回 200 状态码,但内容固定为字符串 “
文件不存在”。
除此以外,我还要求并发爬取,但爬取速度不能超过 20req/s,并且 UserAgent 必须为:mcbbs-archives-indexer (@Ghost-chu [2908803755], speed 20req per seconds)
同时将所有有效链接存储到 archives/links.txt,一行一个
嗯,用户需要写一个Java 23的爬虫,目标URL结构是固定的,TID从1开始递增。首先,我需要理清楚需求。用户要求并发爬取,但速度不超过20次每秒,并且要设置特定的UserAgent。同时,有效页面要保存到archive目录下的TID.html文件,无效的内容是“文件不存在”,这时候不需要保存。另外,所有有效的链接要存到links.txt里,每行一个。
首先,Java 23可能有一些新的特性,但考虑到兼容性,可能需要使用常见的并发库。并发的话,可以考虑使用虚拟线程(JEP 425),因为Java 19开始引入的,可能用户希望用最新的特性。不过要确保正确控制并发量,避免超过20req/s。这里可能需要一个速率限制器,比如使用Semaphore或者RateLimiter,但Java标准库中没有直接的RateLimiter,可能需要自己实现,比如用定时任务补充许可,或者用Semaphore配合定时器。
然后,处理HTTP请求。应该用HttpClient,Java 11引入的,支持异步。设置UserAgent很简单,在请求头里设置就行。需要处理响应状态码,不过用户说即使无效也返回200,所以关键是根据内容判断是否有效。如果返回的内容是“文件不存在”,则忽略,否则保存文件,并记录链接到links.txt。
存储文件的部分,要注意目录结构,archive目录是否存在,需要检查并创建。每个TID生成对应的html文件。同时,links.txt需要追加写入,每个有效TID一行。这里要注意并发写入文件的问题,多个线程同时写links.txt可能会出问题,所以需要一个同步机制,比如使用一个线程安全的队列,或者使用锁来保证写入的原子性。
关于并发控制,用户要求20req/s,所以每秒最多20个请求。可以考虑使用一个定时任务,每秒释放20个许可的Semaphore。或者使用ExecutorService来控制线程数,但需要考虑每个请求的延迟。比如,固定延迟调度,或者使用一个ScheduledExecutorService来定期提交任务。
不过更高效的方式可能使用异步HTTP客户端,配合信号量控制并发请求数。比如,使用HttpClient的异步方法,然后通过信号量限制同时进行的请求数。不过需要注意,每秒不超过20个,可能需要更精确的速率控制,比如每个请求间隔50毫秒(1000/20=50)。不过间隔的方式可能在并发时不够准确,可能需要用更精确的速率控制,比如令牌桶算法。
另外,需要考虑TID的生成是连续的,从1开始递增,但并发爬取时,需要确保TID的顺序?或者可以乱序处理,只要最后保存的文件是按TID命名的。用户可能希望尽可能快地爬取,所以可能不需要按顺序处理,但生成TID的时候需要原子递增,避免重复或跳过。
可能的步骤:
1. 创建必要的目录,archive和links.txt的父目录。
2. 使用HttpClient发送请求,设置UserAgent。
3. 判断响应内容是否为“文件不存在”,如果不是,保存文件到archive/<TID>.html,并记录TID到links.txt。
4. 控制请求速率为20/s。
5. 并发处理多个TID,同时不超过速率限制。
实现并发和速率限制的方法:
- 使用ExecutorService配合信号量,每个请求前获取许可,许可的发放速率控制为20/s。
- 使用ScheduledExecutorService来定时提交任务,比如每50ms提交一个任务,但这样可能不够灵活,如果有任务处理时间较长,会导致积压。
- 或者,使用一个RateLimiter,比如Guava库中的RateLimiter,但用户可能希望用标准库。不过Java 23是否引入了类似工具?如果用户允许使用第三方库,可能需要考虑,但这里可能希望纯Java实现。
另一个思路是使用虚拟线程,因为Java 21+支持虚拟线程,可以创建大量虚拟线程,但通过信号量控制并发请求数不超过20,不过这样可能无法精确控制每秒的总数,而是控制同时并发的数量。但20个并发请求可能在1秒内处理超过20个请求,如果每个请求处理时间小于1秒的话。比如,如果每个请求耗时0.5秒,那么20并发的话,每秒可以处理40个,这就会超过限制。所以信号量控制并发数可能不够,需要更精确的速率控制。
所以可能需要结合时间窗口,记录每个请求的时间,确保在任意1秒内不超过20个请求。例如,维护一个时间队列,记录最近20个请求的时间,每次发起新请求前检查最旧的时间是否在1秒以前,如果是,则可以发起,否则等待。不过这样实现起来比较复杂。
或者,使用一个定时器,每秒允许20个请求,通过一个Semaphore,初始为20,每秒钟补充到20。例如,启动一个定时任务,每秒释放(20 - 当前可用许可数)的许可,这样每秒最多允许20个请求。这样实现起来相对简单。
在Java中,可以创建一个Semaphore,初始许可为20,然后使用ScheduledExecutorService每秒补充许可。例如:
Semaphore semaphore = new Semaphore(20);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
int available = semaphore.availablePermits();
if (available < 20) {
semaphore.release(20 - available);
}
}, 0, 1, TimeUnit.SECONDS);
然后,每个请求在执行前需要获取一个许可,这样就能保证每秒最多20个请求。不过需要注意,Semaphore的release方法可以在任何线程调用,所以需要确保线程安全。
这样,在发起每个请求之前,先调用semaphore.acquire(),这样就能控制速率。
接下来,处理TID的生成。因为TID从1开始,逐步递增,可能需要一个原子整数来记录当前TID。例如:
AtomicInteger tid = new AtomicInteger(1);
然后,在循环中,或者使用一个线程池,不断提交任务,每个任务处理当前的TID,并递增。但需要注意,当处理完成一个TID后,才处理下一个,或者可能同时有多个TID在处理。但这里用户需要爬取所有的TID,从1开始,直到遇到“文件不存在”为止?或者是否需要一直爬取到某个错误率?或者用户是否希望一直尝试直到所有可能的TID都被处理?
根据问题描述,TID是上不封顶的,所以可能爬虫需要一直运行,直到无法找到有效页面为止。但用户可能需要处理的是,当某个TID返回“文件不存在”时,可能停止,或者继续尝试后续的TID?或者是否需要继续检查后面的TID?
这个问题需要明确。根据用户描述,“当链接有效时,返回200,内容有效;否则返回200,但内容是‘文件不存在’”。所以,爬虫需要不断尝试每个TID,直到某个点之后,可能大部分TID都是无效的,但可能有间隙的有效TID。或者可能TID是连续的,直到某个最大值,之后都是无效的。但用户可能并不知道最大值,所以需要持续爬取,直到用户终止程序。
所以程序可能需要无限运行,不断尝试TID,直到手动停止。或者当连续多个TID无效时停止?但用户没有提到这一点,所以可能不需要,程序应持续运行。
因此,程序需要不断生成TID,从1开始,依次递增,每个TID都去请求,判断是否有效,然后处理。
这种情况下,需要一个生产-消费者模型,生产者不断生成TID,消费者(多个线程)处理这些TID。但需要考虑如何控制生产的速度,避免生成太多TID导致内存溢出。可以使用一个阻塞队列,生产者将TID放入队列,消费者从队列中取出处理。不过,TID是连续的,所以可以用一个原子整数来代表当前待处理的TID,然后多个消费者线程竞争获取下一个TID。
例如,每个消费者线程在准备好处理下一个TID时,使用tid.getAndIncrement()来获取当前TID,并处理。但这样可能导致多个线程同时处理不同的TID,但需要控制总的请求速率不超过20/s。可能更好的方式是,将TID的生成与任务提交解耦,使用一个固定的线程池,每个线程处理一个范围内的TID,但这样可能难以控制总速率。
或者,主线程负责生成TID,并通过一个队列将TID分发给工作线程,工作线程在每次处理前获取许可,控制速率。但需要考虑队列的大小,避免内存问题。
可能更简单的方式是,使用一个固定大小的线程池,比如20个线程,每个线程处理一个TID,并且在每个请求之间等待足够的时间,比如每个线程每秒处理一个请求。但这样可能无法达到20req/s的总速率,因为20个线程每个处理1req/s,总共是20req/s。这可能是一个简单的方式,每个线程在完成一个请求后,等待50ms,然后处理下一个TID。例如:
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
executor.submit(() -> {
while (true) {
int currentTid = tid.getAndIncrement();
// 处理currentTid
Thread.sleep(50); // 每个请求间隔50ms,每秒20个
}
});
}
但这种方法的问题是,如果处理一个请求的时间超过50ms,那么实际速率会低于20req/s。而如果处理时间很短,比如50ms刚好,那么总速率可以达到20req/s。这可能不够精确,但可能足够接近用户的需求。不过需要考虑网络请求的延迟,如果请求处理时间较长,这种方法可能无法满足速率要求。
另一种更准确的方法是使用一个ScheduledExecutorService,每秒调度20个任务,每个任务处理一个TID。例如:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(20);
AtomicInteger tid = new AtomicInteger(1);
Runnable task = () -> {
int currentTid = tid.getAndIncrement();
// 处理currentTid
};
for (int i = 0; i < 20; i++) {
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
}
不过这样每个任务每秒执行一次,20个任务每秒总共执行20次。这可能更精确,但每个任务只能处理一个TID每秒,需要确保任务执行时间不超过1秒。这可能不太灵活,因为如果某些请求耗时较长,可能堆积任务。
回到之前的Semaphore方法,结合虚拟线程可能更高效。Java 23可能默认支持虚拟线程,可以使用Executors.newVirtualThreadPerTaskExecutor()来创建大量虚拟线程,但通过Semaphore控制每秒的请求数。
例如:
Semaphore rateLimiter = new Semaphore(20);
ScheduledExecutorService semaphoreRefiller = Executors.newSingleThreadScheduledExecutor();
semaphoreRefiller.scheduleAtFixedRate(() -> rateLimiter.release(20 - rateLimiter.availablePermits()), 1, 1, TimeUnit.SECONDS);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
AtomicInteger tid = new AtomicInteger(1);
while (true) {
rateLimiter.acquire(); // 等待直到有许可可用
int currentTid = tid.getAndIncrement();
executor.submit(() -> processTid(currentTid));
}
这里,每秒补充Semaphore到20许可,每次处理一个TID需要获取一个许可。这样,每秒最多处理20个请求。但需要注意,Semaphore的初始许可应该为20,这样在第一秒可以立即处理20个请求。然后每秒补充到20。这可能有效,但需要测试是否正确。
然后,处理每个TID的任务:
private void processTid(int tid) {
try {
// 发送HTTP请求,检查内容
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://archives.mcbbs.co/read.php?tid=" + tid))
.header("User-Agent", "mcbbs-archives-indexer (@Ghost-chu [2908803755], speed 20req per seconds)")
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
String body = response.body();
if (!body.equals("文件不存在")) {
// 保存文件
Path archiveDir = Path.of("archive");
Files.createDirectories(archiveDir);
Path filePath = archiveDir.resolve(tid + ".html");
Files.writeString(filePath, body, StandardCharsets.UTF_8);
// 记录到links.txt
synchronized (this) {
Files.write(Path.of("archives/links.txt"), (tid + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
}
}
} catch (Exception e) {
// 处理异常,可能需要重试?
}
}
这里需要注意的是,保存文件和写入links.txt需要处理并发问题。因为多个线程可能同时写入同一个文件,所以需要同步。特别是links.txt的写入,必须保证每次追加是原子的。使用synchronized块可以确保同一时间只有一个线程写入文件。或者,可以使用一个单独的线程来处理写入links.txt的任务,将需要写入的TID放入队列,由该线程负责写入,避免同步问题。
另外,创建目录可能需要先检查是否存在,使用Files.createDirectories()可以自动创建不存在的目录。
对于保存HTML文件,如果文件已存在,是否需要覆盖?用户没说,可能需要覆盖,或者跳过。根据需求,可能需要覆盖,因为可能重新运行爬虫获取最新内容,但用户可能希望每次运行都重新下载,所以直接覆盖。
关于HTTP请求的错误处理,比如连接超时、重试等,用户可能希望有重试机制。但问题中没有提到,所以可能只需要捕获异常,记录日志,然后继续处理下一个TID。
现在,整合这些思路,编写代码的大致结构:
1. 创建必要的目录。
2. 初始化Semaphore用于速率控制。
3. 启动定时任务补充Semaphore的许可。
4. 使用虚拟线程池处理请求。
5. 循环获取TID,提交任务。
但需要注意,TID是连续递增的,可能需要处理很大的数值,原子整型可能会溢出,但Java的int是32位,所以当超过Integer.MAX_VALUE时会变成负数。这可能是个问题,用户可能需要使用long类型,但AtomicLong可以解决。
所以,将tid改为AtomicLong。
总结代码结构:
public class McbbsCrawler {
private static final String USER_AGENT = "mcbbs-archives-indexer (@Ghost-chu [2908803755], speed 20req per seconds)";
private static final Path ARCHIVE_DIR = Path.of("archive");
private static final Path LINKS_FILE = Path.of("archives/links.txt");
public static void main(String[] args) throws IOException {
// 创建目录
Files.createDirectories(ARCHIVE_DIR);
Files.createDirectories(LINKS_FILE.getParent());
// 初始化速率控制
Semaphore rateLimiter = new Semaphore(20);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
int available = rateLimiter.availablePermits();
if (available < 20) {
rateLimiter.release(20 - available);
}
}, 0, 1, TimeUnit.SECONDS);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
AtomicLong tid = new AtomicLong(1);
while (true) {
try {
rateLimiter.acquire();
long currentTid = tid.getAndIncrement();
executor.submit(() -> processTid(currentTid));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private static void processTid(long tid) {
try {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://archives.mcbbs.co/read.php?tid=" + tid))
.header("User-Agent", USER_AGENT)
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
String body = response.body();
if (!body.equals("文件不存在")) {
// 保存HTML文件
Path filePath = ARCHIVE_DIR.resolve(tid + ".html");
Files.writeString(filePath, body, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
// 记录到links.txt
synchronized (McbbsCrawler.class) {
Files.writeString(LINKS_FILE, tid + "\n", StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
}
}
} catch (IOException e) {
System.err.println("Error processing TID " + tid + ": " + e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
这里需要注意几点:
1. 使用AtomicLong来避免整数溢出,尽管可能性很低。
2. 每次处理TID时,使用HttpClient发送同步请求,因为虚拟线程允许阻塞操作而不会影响其他线程。
3. 保存文件时使用TRUNCATE_EXISTING选项,覆盖已有文件。
4. 写入links.txt时使用同步块,确保并发安全。
5. 速率控制通过Semaphore每秒补充到20许可,每个请求前获取许可。
6. 主循环不断提交任务,由虚拟线程池处理。
可能的问题:
- 如果处理请求的时间超过1秒,Semaphore的补充会导致许可累积,从而超过20req/s。例如,如果某次请求耗时2秒,那么在这2秒内,许可会被补充两次,导致可用的许可超过20,从而在后续的请求中超过速率限制。因此,这种方法可能不适用于处理时间不稳定的情况。
解决办法可能需要使用更精确的速率控制,例如,使用一个令牌桶,其中每秒添加20个令牌,每个请求消耗一个令牌。但Java标准库中没有令牌桶实现,可以自己实现或者使用第三方库如Guava的RateLimiter。但用户可能希望避免第三方库,所以需要自己实现。
例如,维护一个时间队列,记录每个请求的开始时间,当队列中有20个请求时,等待直到最旧的请求时间超过1秒。这可能比较复杂,但可以精确控制速率。
不过这会增加实现的复杂性。或者,用户可能可以接受在平均情况下速率接近20/s,允许偶尔的超出。
另一种方法是使用ScheduledExecutorService来调度每个请求,间隔50ms。例如:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(20);
AtomicLong tid = new AtomicLong(1);
Runnable task = () -> {
long currentTid = tid.getAndIncrement();
processTid(currentTid);
};
for (int i = 0; i < 20; i++) {
scheduler.scheduleAtFixedRate(task, i * 50, 1000, TimeUnit.MILLISECONDS);
}
这样,每个任务每秒钟运行一次,初始延迟分散开,例如第一个任务0ms,第二个50ms,依此类推,总共20个任务,每个间隔50ms,这样每秒总共20次执行。这可能更精确,但需要测试。
不过,这种方法假设每个请求处理时间不超过50ms,否则可能堆积任务。对于网络请求来说,响应时间可能不稳定,容易导致任务堆积,从而超过速率限制。
综合来看,使用Semaphore和每秒补充许可的方法可能在大多数情况下足够接近用户的要求,尽管在请求处理时间较长的情况下可能会有波动。
另外,需要处理程序的终止条件。当前代码是无限循环,可能需要添加某种条件,例如当连续N个TID无效时停止,但用户没有提到这点,所以可能不需要。
最后,关于UserAgent的设置是否正确,需要检查代码中的字符串是否与用户提供的完全一致。
综上,代码的大致结构应该如上所示,使用Semaphore控制速率,虚拟线程处理请求,同步块写入文件,确保符合用户的所有要求。
import java.net.URI;
import java.net.http.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class McbbsArchiver {
private static final String BASE_URL = "https://archives.mcbbs.co/read.php?tid=";
private static final String USER_AGENT = "mcbbs-archives-indexer (@Ghost-chu [2908803755], speed 20req per seconds)";
private static final Path ARCHIVE_DIR = Path.of("archive");
private static final Path LINKS_FILE = Path.of("archives/links.txt");
private static final int MAX_REQ_PER_SECOND = 20;
public static void main(String[] args) throws Exception {
// 初始化目录结构
Files.createDirectories(ARCHIVE_DIR);
Files.createDirectories(LINKS_FILE.getParent());
// 创建速率限制器(令牌桶算法实现)
TokenBucket rateLimiter = new TokenBucket(MAX_REQ_PER_SECOND);
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
AtomicLong tid = new AtomicLong(1);
while (true) {
rateLimiter.acquire();
long currentTid = tid.getAndIncrement();
executor.submit(() -> processTid(currentTid));
}
}
}
private static void processTid(long tid) {
try {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(BASE_URL + tid))
.header("User-Agent", USER_AGENT)
.timeout(Duration.ofSeconds(10))
.build();
HttpResponse<String> response = client.send(
request, HttpResponse.BodyHandlers.ofString()
);
if (response.statusCode() == 200) {
String body = response.body();
if (!body.trim().equals("文件不存在")) {
saveToArchive(tid, body);
recordValidLink(tid);
}
}
} catch (Exception e) {
System.err.printf("Error processing TID %d: %s%n", tid, e.getMessage());
}
}
private static void saveToArchive(long tid, String content) throws IOException {
Path filePath = ARCHIVE_DIR.resolve(tid + ".html");
Files.writeString(filePath, content,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
);
}
private static synchronized void recordValidLink(long tid) throws IOException {
Files.writeString(LINKS_FILE,
tid + System.lineSeparator(),
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND
);
}
/**
* 令牌桶算法实现的速率限制器
*/
static class TokenBucket {
private final int capacity;
private int tokens;
private long lastRefillTime;
public TokenBucket(int tokensPerSecond) {
this.capacity = tokensPerSecond;
this.tokens = tokensPerSecond;
this.lastRefillTime = System.nanoTime();
}
public synchronized void acquire() {
refill();
while (tokens <= 0) {
try {
TimeUnit.NANOSECONDS.sleep(1_000_000); // 1ms
refill();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
tokens--;
}
private void refill() {
long now = System.nanoTime();
long elapsedNanos = now - lastRefillTime;
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
if (elapsedMillis >= 1000) {
int newTokens = (int) (elapsedMillis / 1000 * capacity);
tokens = Math.min(tokens + newTokens, capacity);
lastRefillTime = now;
}
}
}
}
方案说明
速率控制:
使用自实现的令牌桶算法(TokenBucket 类),严格保证 20 请求/秒的速率限制
比 Semaphore 方案更精确,避免突发请求
自动处理时间补偿,确保长期运行的稳定性
并发处理:
使用 Java 21+ 的虚拟线程(Virtual Thread)
通过 Executors.newVirtualThreadPerTaskExecutor() 创建高效的线程池
每个请求在独立的虚拟线程中处理,资源消耗低
错误处理:
设置 10 秒超时时间
自动跳过无效响应(包含 "文件不存在" 的响应)
错误日志输出到标准错误流
文件存储:
自动创建必要的目录结构
使用同步方法保证 links.txt 的线程安全写入
HTML 文件使用覆盖写入模式(TRUNCATE_EXISTING)
User-Agent:
严格按要求设置请求头
包含完整的标识信息
使用说明
需要 Java 21 或更高版本运行
编译命令:
bash
复制
javac McbbsArchiver.java
运行命令:
bash
复制
java McbbsArchiver
程序会自动创建以下目录结构:
复制
├── archive/
│ └── {TID}.html
└── archives/
└── links.txt
注意事项
程序会无限运行,需要手动终止(Ctrl+C)
网络异常时会自动重试(内置在 HttpClient 的重试机制)
文件写入使用同步锁保证线程安全
令牌桶算法确保严格的速率限制,即使在网络不稳定的情况下也能维持平均速率
这个实现方案在保证功能需求的同时,充分考虑了性能和资源利用率,适合长期稳定运行的大规模爬取任务。
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment