当前位置: 首页 > 知识库问答 >
问题:

并发-两个线程从不同的服务器查询多个数据库并同步比较每个记录

颜欣怡
2023-03-14

我想有两个线程查询(JDBC)两个表(来自不同的服务器/数据库但相关)以获取有序输出,然后比较它们或逐条应用一些逻辑记录。

表的大小可能非常大,所以我认为使用线程是以最少的占用空间完成这一任务的最有效方法。

示例:

Thread1 -查询表server 1 . database 1 . schema 1 . tablea按1排序;

线程 2 - 查询表服务器 2.database2.schema2.tableB,其中 [与 A 相关的条件/逻辑] 按 1 排序;

在两个线程中的ResultSet中的每条记录上同步并应用比较或数据逻辑。

例如:Thread1=[1,2,3,4,5]的ResultSet

线程2的结果集=[2,4,6,8,10]

我希望能够在每个索引(0…4)上同步,并比较它们。说Thread1。ResultSet[0]=Thread2。ResultSet[0]/2。

这意味着:

1 = 2/2

2=4/2

等…

这是我到目前为止得到的,基于我在研究时得到的另一个答案。我正在使用原子整数来同步ResultSet迭代。

//Main class
public class App {
    public static void main(String[] args) {
        try {
            ReaderThread t1 = new ReaderThread();
            ReaderThread t2 = new ReaderThread();
            List<ReaderThread> list = new ArrayList<ReaderThread>();
            list.add(t1);
            list.add(t2);
            
            HelperThread helperThread = new HelperThread(list);
            helperThread.start();
            
            t1.setName("Reader1");
            t2.setName("Reader2");
            t1.start();
            t2.start();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

//Database ReaderThread 
public class ReaderThread extends Thread {

    private DatabaseAccessLayer dal = new DatabaseAccessLayer(); //access layer to instantiate connection, statement and execute query and return ResultSet
    private ResultSet rs;
    private final Object hold = new Object();
    private final AtomicInteger lineCount = new AtomicInteger(0);
    private String currentLine;

    public ReaderThread() throws SQLException {
        this.rs = dal.executeStatement(); //execute SQL query on instantiation and get the resultset
    }

    @Override
    public void run() {
        synchronized (hold) {
            try {
                while (rs.next()) {
                    currentLine = rs.getString(1) + rs.getString(2) + rs.getString(3) + rs.getString(4)
                            + rs.getString(5) + rs.getString(5);
                    lineCount.getAndIncrement();
                    System.out.println(this.getName() + " ||| " + currentLine);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }

        }
    }

    public void hold () throws InterruptedException {
        this.hold.wait();
    }

    public void release() {
        this.hold.notify();
    }

    public boolean isLocked() {
        return getState().equals(State.WAITING);
    }

    public Object getHold() {
        return hold;
    }

    public AtomicInteger getLineCount() {
        return lineCount;
    }

    public String getCurrentLine() {
        return currentLine;
    }

}

// THe helper class which look at two threads and determine lock conditions and subsequence logic
public class HelperThread extends Thread {
    private List<ReaderThread> threads;

    @Override
    public void run() {
        while (true) {
            threads.forEach(t -> {
                try {
                    int r1 = 0;
                    int r2 = 0;
                    //======== lock and synchronize logic here =========
                    if (t.getName().equals("Reader1")) r1 = t.getLineCount().get();
                    if (t.getName().equals("Reader2")) r2 = t.getLineCount().get();
                    if (t.getName().equals("Reader1") && r1 == r2) t.hold();
                    if (t.getName().equals("Reader2") && r2 == r1) t.hold();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            if (threads.stream().allMatch(ReaderThread::isLocked)) {
                System.out.println("next line:");

                threads.forEach(t -> {
                    synchronized (t.getLock()) {
                        System.out.println(t.getCurrentLine());
                        t.release();
                    }
                });

                System.out.println("\n");
            }
        }

    }

    public HelperThread(List<ReaderThread> threads) {
        this.threads = threads;
    }
}

上面的代码能够在表上并发执行查询,并打印出每个表的结果集。但是,锁定/保持逻辑不起作用。当两个线程中的 AtomicInteger 变量相同时,我正在尝试将线程置于暂停状态。这在我的代码中意味着它将逐个循环访问结果集。对于每个线程,AtomicInteger 变量都会递增,并将等待到另一个线程的 AtomicInteger 变量达到相同的值。然后比较逻辑发生,然后两个线程都被释放以继续前进。

我不确定如果原子整数是正确的用法在这里。

任何建议都非常感谢。

共有1个答案

鲜于星波
2023-03-14

一个好的解决方案可以是使用两个ArrayBlockingQueue作为缓冲区

ArrayBlockingQueue db1Buf=new ArrayBlockingQueue

读取线程只是向缓冲区提供行

while (rs.next()) {
        MyData data=new MyData(rs.getString(1),rs.getString(2)...);           
        db1Buf.offer(data); //this waits if the buffer is full
 }
  db1Buf.offer(null); //signal the end of table

第三个线程处理数据

for(;;) {
     MyData db1Record=db1Buf.take();
     MyData db2Record=db2Buf.take();
     if (db1Record==null || db2Record==null) 
         break;
     
   // do something with db1Record and db2Record

}

不需要同步,因为 ArrayBlockingQueue 已经同步。

读取线程将馈送缓冲区并块(如果它们已满),如果缓冲区为空,则第三个线程将使用数据等待其他线程读取数据。MyData类是一个简单的bean,其中包含您需要的字段。

 类似资料:
  • 多个同步服务器 Since you have full control of express instance lifecycle, it's not a problem to create a few multiple simultaneous servers (e.g. both HTTP & HTTPS). Example: 因为你已经可以完全控制express实例的生命周期了,所以创建多个

  • 问题内容: 我正在寻找一种处理以下情况的方法: 我们有一个数据库服务器,上面有多个数据库(所有数据库都有相同的架构,不同的数据)。 我们正在寻找一种查询所有数据库的方法(并且它易于配置,因为可以随时添加更多数据库)。此数据访问必须是实时的。 举例来说,假设您有一个插入订单的应用程序- 每个应用程序都有自己的数据库等。我们正在寻找的是一种有效的方式,使单个应用程序可以访问所有其他数据库中的订单信息,

  • 使用JDBC驱动程序,我如何使用来自不同查询的多个结果集,而不不断地打开和关闭连接,因为我正在提取所需的w.e并将其传递给另一个方法。每次打开新的conn、语句和结果集时 我试图在一个方法中使用多个结果集,但它一直抛出异常,称结果集已关闭。我没有太多的SqlServver经验,所以任何指导都会有所帮助:-)

  • 同步 同步指的是线程之间的协作配合,以共同完成某个任务。在整个过程中,需要注意两个关键点:一是共享资源的访问, 二是访问资源的顺序。通过前面的介绍,我们已经知道了如何让多个线程访问共享资源,但并没介绍如何控制访问顺序,才不会出现错误。如果两个线程同时访问同一内存地址的数据,一个写,一个读,如果不加控制,写线程只写了一半,读线程就开始读,必然读到的数据是错误的,不可用的,从而造成程序错误,这就造成了

  • 问题内容: 我需要比较2个不同数据库中的数据库表,以了解差异所在,是否有一个简单的工具或脚本来实现? 问题答案: redgate SQL数据比较

  • 我有两个哈希数组: 我想在< code>a2中找到其< code>ID和< code>name字段与< code>a1中条目的< code>ID和< code>name字段相匹配的散列(不考虑< code>email或任何其他进入< code>a2的项目),然后将< code>ORDER_NO的值合并到< code>a1散列中也就是说,以下列方式结束: 我也想忽略 a2 中存在的元素,但不忽略 a