当前位置: 首页 > 面试题库 >

分布式事务如何在线程环境中与同一个数据库建立多个连接?

司凡
2023-03-14
问题内容

我试图确定分布式事务中多个数据库连接的行为。

我有一个运行很长时间的过程,它产生了一系列线程,然后每个线程负责管理其DB连接等。所有这些都在事务作用域内运行,并且每个线程都通过一个DependentTransaction对象加入到事务中。

当我并行执行此过程时,我遇到了几个问题,即似乎存在某种阻止查询在事务上同时执行的块。

我想知道的是事务协调器如何处理从多个连接到同一数据库的查询,甚至是否建议跨线程传递连接对象?

我读过MS
SQL在每个事务中只允许一个连接,但是我显然能够在同一事务中创建和初始化一个以上连接到同一DB的连接。打开连接时,如果没有获取“另一个会话正在使用的事务上下文”异常,我将无法并行执行线程。结果是连接必须等待执行,而不是同时运行,最终代码运行完成,但是由于存在锁定问题,因此对应用程序进行线程化没有任何好处。

该代码看起来像这样。

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

编辑

我的测试最终表明,这是无法完成的。即使存在多个连接或使用相同的连接,事务中的所有请求或问题也会被顺序处理。

也许他们将来会改变这种行为。


问题答案:

首先,您必须将在这里和那里读到的有关SQL Server事务的内容分为两种不同的情况:本地和分布式。

本地SQL事务

  • SQL Server只允许在每个本地事务上执行一个请求。
  • 默认情况下,只有一个会话可以注册本地事务。使用sp_getbindtoken和sp_bindsession可以将多个会话注册到本地事务中。会话仍然限于在任何时间仅执行一个请求。
  • 使用多个活动结果集(MARS),一个会话可以执行多个请求。所有请求都必须注册在同一本地事务中。

分布式事务

  • 多个会话可以将其本地事务注册到单个分布式事务中。
  • 每个会话仍在本地交易中注册,但要遵守上述本地交易的所有限制
  • 分布式事务中注册的本地事务受分布式事务协调的两阶段提交的约束
  • 注册到分布式事务中的实例上的所有本地事务仍然是 独立的 本地事务,这主要意味着它们具有冲突的锁命名空间。

因此,当客户端创建.Net
TransactionScope并在此事务范围下,它在同一服务器上执行多个请求时,这些请求都是注册在分布式事务中的所有本地事务。一个简单的例子:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

创建一个虚拟测试表:

create table test (id int not null identity(1,1) primary key, a varchar(100));

并运行示例中的代码。您将看到两个请求并行执行,每个请求浪费表中的100k行,然后在事务范围完成时都提交。因此,您看到的问题与SQL
Server或TransactionScope都不相关,它们可以轻松处理您描述的情况。而且,该代码非常简单明了,并且不需要创建依赖事务,进行克隆或促进事务。

更新

使用显式线程和相关事务:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }


 类似资料:
  • 我们设置了开发环境,以便Hibernate每次启动我们的应用程序时都会创建一个新的空数据库: 这适用于部署在服务器上的单个应用程序,但在我们的开发环境中,我们通常部署连接到同一数据库的多个应用程序。它们都使用来自应用程序服务器的相同数据源。 问题在于,当它们在 JBoss AS7 中启动时,它们是并行部署的,因此两个应用程序服务器都尝试同时创建表。我们得到这样的东西(带有匿名的表格和列名): 这些

  • 问题内容: 我有一个使用在不同地理位置的四个数据库的应用程序。所有数据库都包含相同的表,只有数据库名称根据位置而不同。我必须在应用程序中创建一些报告,这些报告使用每个数据库中的数据。从Java应用程序创建那些数据库连接的正确方法是什么,是否有适合我使用的适合此任务的设计模式? 问题答案: 由于您没有任何的标记这个你的问题,,,,我假设你正在处理普通的JDBC。 话虽如此,我建议您有一个DAO层来处

  • 1. 前言 一个项目中使用多个数据源的需求,我们在日常工作中时常会遇到。 以商城系统为例,有一个 MySQL 的数据库负责存储交易数据。公司还有一套 ERP 企业信息化管理系统,要求订单信息同步录入 ERP 数据库,便于公司统一管理,而该 ERP 系统采用的数据库为 SQL Server 。 此时,就可以在 Spring Boot 项目中配置多个数据源。另外,使用多数据源后,需要采用分布式事务来保

  • 面试问题 比如说,我们有一个在Employee表中有200万条记录的表,我们需要削减每个员工10%的工资(需要做一些处理),然后将其保存回collection。你怎样才能有效地做到这一点。 我问他,我们可以使用executor框架来创建多个线程,这些线程可以从表中获取值,然后我们可以处理并将其保存到列表中。 然后他问我,你将如何检查一个记录是否已经被处理,我不知道(如何做)。 甚至我也不确定我是否

  • 问题内容: 我们遇到了适用于多线程的方案。 在主线程中,执行一些逻辑操作并更新数据库,在某种程度上,它将调用另一个服务来更新数据库,该服务在另一个线程中运行。 我们希望两个线程共享同一个事务,这意味着任何一个线程中的任何一个操作都将失败,那么另一个线程中的该操作也将被回滚。 但是工作了几天,我发现一些帖子说JTA不支持多线程。当前我们使用Bitronix作为JTA提供者,有没有人知道Bitroni